mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-20 11:38:28 -06:00
implement efficient concurrent thread state signaling
This commit is contained in:
parent
0de80887b1
commit
bf5a31b342
@ -28,6 +28,7 @@ import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
|
||||
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
|
||||
|
||||
import java.io.InputStream;
|
||||
import java.io.PrintWriter;
|
||||
@ -215,4 +216,6 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
|
||||
default int getHdrDigits() {
|
||||
return getParams().getOptionalInteger("hdr_digits").orElse(4);
|
||||
}
|
||||
|
||||
RunStateTally getRunStateTally();
|
||||
}
|
||||
|
@ -16,39 +16,69 @@
|
||||
|
||||
package io.nosqlbench.engine.api.activityapi.core;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.MotorState;
|
||||
|
||||
/**
|
||||
* <P>This enum indicates the state of a thread within an activity. The state is kept in an atomic
|
||||
* register. Ownership of state changes is shared between a supervising thread and a managed thread.
|
||||
* Both can make changes to the state.</P>
|
||||
*
|
||||
* <P>These states are ordered such that the highest ordinal state represents the most significant
|
||||
* aggregate state of all motors. That is, if any has errored, then the other states do not matter.
|
||||
* If any is finished, then stopped motors don't matter, and so on. This makes summarizing aggregate
|
||||
* state simply a matter of ordering.</P>
|
||||
*
|
||||
*/
|
||||
public enum RunState {
|
||||
|
||||
|
||||
/**
|
||||
* Initial state after creation of this control
|
||||
* Initial state after creation of a motor. This is the initial state upon instantiation of a motor, before
|
||||
* it is called on to do any active logic besides what occurs in the constructor.
|
||||
*/
|
||||
Uninitialized("i⌀"),
|
||||
|
||||
/**
|
||||
* This thread has been queued to run, but hasn't signaled yet that it is full started
|
||||
* This must be set by the executor before executing the slot runnable
|
||||
* A thread has been invoked, but is initializing and preparing for its main control loop.
|
||||
* This is signaled <EM>by the motor</EM> after {@link Runnable#run}, but before entering the main processing
|
||||
* loop.
|
||||
*/
|
||||
Starting("s⏫"),
|
||||
|
||||
/**
|
||||
* This thread is running. This should only be set by the controlled thread
|
||||
* A thread is iterating within the main control loop.
|
||||
* This is signaled <EM>by the motor</EM> once initialization in the main loop is complete and immediately
|
||||
* before it enters it's main processing loop.
|
||||
*/
|
||||
Running("R\u23F5"),
|
||||
|
||||
/**
|
||||
* This thread has completed all of its activity, and will do no further work without new input
|
||||
*/
|
||||
Finished("F⏯"),
|
||||
|
||||
/**
|
||||
* The thread has been requested to stop. This says nothing of the internal state.
|
||||
* <P>The thread has been requested to stop. This can be set by a managing thread which is not the
|
||||
* motor thread, or by the motor thread. In either case, the motor thread is required to observe changes to this and initiate shutdown.</P>
|
||||
*/
|
||||
Stopping("s⏬"),
|
||||
|
||||
/**
|
||||
* The thread has stopped. This should only be set by the controlled thread
|
||||
* The thread has stopped. This should only be set by the motor. This state will only be visible
|
||||
* to signaling mechanisms so long as the motor is still managed.
|
||||
*
|
||||
* <P>NOTE: When a motor is stopped or finished, its state will remain visible in state tracking until
|
||||
* {@link Motor#getState()}.{@link MotorState#removeState()} is called.</P>
|
||||
*/
|
||||
Stopped("_\u23F9");
|
||||
Stopped("e\u23F9"),
|
||||
|
||||
/**
|
||||
* <P>A thread has exhausted its supply of values on the input (AKA cycles), thus has completed its work.
|
||||
* This is signaled upon a short read of the input <EM>by the motor</EM>.</P>
|
||||
*
|
||||
* <P>NOTE: When a motor is stopped or finished, its state will remain visible in state tracking until
|
||||
* {@link Motor#getState()}.{@link MotorState#removeState()} is called.</P>
|
||||
*/
|
||||
Finished("F⏯"),
|
||||
|
||||
/**
|
||||
* If a motor has seen an exception, it goes into errored state before propagating the error.
|
||||
*/
|
||||
Errored("E⚠");
|
||||
|
||||
private final String runcode;
|
||||
|
||||
@ -69,15 +99,16 @@ public enum RunState {
|
||||
default -> false; // A motor was just created. This is its initial state.
|
||||
case Uninitialized, Stopped -> (target == Starting);
|
||||
case Starting -> switch (target) { // a motor has indicated that is in the run() method
|
||||
case Running, Finished -> true;// a motor has exhausted its input, and has declined to go into started mode
|
||||
case Running, Finished, Errored -> true;// a motor has exhausted its input, and has declined to go into started mode
|
||||
default -> false;
|
||||
};
|
||||
case Running -> switch (target) { // A request was made to stop the motor before it finished
|
||||
case Stopping, Finished -> true;// A motor has exhausted its input, and is finished with its work
|
||||
case Stopping, Finished, Errored -> true;// A motor has exhausted its input, and is finished with its work
|
||||
default -> false;
|
||||
};
|
||||
case Stopping -> (target == Stopped); // A motor was stopped by request before exhausting input
|
||||
case Stopping -> (target == Stopped||target==Finished); // A motor was stopped by request before exhausting input
|
||||
case Finished -> (target == Running); // A motor was restarted?
|
||||
case Errored -> target==Errored;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -17,26 +17,31 @@
|
||||
package io.nosqlbench.engine.api.activityimpl;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.RunState;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
/**
|
||||
* Holds the state of a slot, allows only valid transitions, and shares the
|
||||
* slot state as
|
||||
*/
|
||||
public class SlotStateTracker {
|
||||
private final AtomicReference<RunState> slotState = new AtomicReference<>(RunState.Uninitialized);
|
||||
private final static Logger logger = LogManager.getLogger(SlotStateTracker.class);
|
||||
public class MotorState implements Supplier<RunState> {
|
||||
private final static Logger logger = LogManager.getLogger("MOTORS");
|
||||
private final AtomicReference<RunState> atomicState = new AtomicReference<>(RunState.Uninitialized);
|
||||
private final long slotId;
|
||||
private final RunStateTally tally;
|
||||
|
||||
public SlotStateTracker(long slotId) {
|
||||
public MotorState(long slotId, RunStateTally tally) {
|
||||
this.slotId = slotId;
|
||||
this.tally = tally;
|
||||
tally.add(atomicState.get());
|
||||
}
|
||||
|
||||
public RunState getSlotState() {
|
||||
return slotState.get();
|
||||
public RunState get() {
|
||||
return atomicState.get();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -46,7 +51,7 @@ public class SlotStateTracker {
|
||||
* @return an atomic reference for SlotState
|
||||
*/
|
||||
public AtomicReference<RunState> getAtomicSlotState() {
|
||||
return slotState;
|
||||
return atomicState;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -56,16 +61,19 @@ public class SlotStateTracker {
|
||||
* @param to The next SlotState for this thread/slot/motor
|
||||
*/
|
||||
public synchronized void enterState(RunState to) {
|
||||
RunState from = slotState.get();
|
||||
RunState from = atomicState.get();
|
||||
if (!from.canTransitionTo(to)) {
|
||||
throw new RuntimeException("Invalid transition from " + from + " to " + to);
|
||||
}
|
||||
while (!slotState.compareAndSet(from, to)) {
|
||||
while (!atomicState.compareAndSet(from, to)) {
|
||||
logger.trace("retrying transition from:" + from + " to:" + to);
|
||||
}
|
||||
tally.change(from,to);
|
||||
logger.trace("TRANSITION[" + slotId + "]: " + from + " ==> " + to);
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void removeState() {
|
||||
logger.trace(() -> "Removing motor state " + atomicState.get());
|
||||
tally.remove(atomicState.get());
|
||||
}
|
||||
}
|
@ -36,6 +36,7 @@ import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec;
|
||||
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
|
||||
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.decorators.SyntheticOpTemplateProvider;
|
||||
import io.nosqlbench.engine.api.activityimpl.uniform.flowtypes.Op;
|
||||
@ -82,6 +83,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
|
||||
private NBErrorHandler errorHandler;
|
||||
private ActivityMetricProgressMeter progressMeter;
|
||||
private String workloadSource = "unspecified";
|
||||
private final RunStateTally tally = new RunStateTally();
|
||||
|
||||
public SimpleActivity(ActivityDef activityDef) {
|
||||
this.activityDef = activityDef;
|
||||
@ -95,7 +97,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
|
||||
} else {
|
||||
activityDef.getParams().set("alias",
|
||||
activityDef.getActivityType().toUpperCase(Locale.ROOT)
|
||||
+ String.valueOf(nameEnumerator++));
|
||||
+ nameEnumerator++);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -190,7 +192,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return getAlias();
|
||||
return getAlias()+":"+getRunState()+":"+getRunStateTally().toString();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -217,7 +219,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
|
||||
@Override
|
||||
public void closeAutoCloseables() {
|
||||
for (AutoCloseable closeable : closeables) {
|
||||
logger.debug("CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable.toString());
|
||||
logger.debug("CLOSING " + closeable.getClass().getCanonicalName() + ": " + closeable);
|
||||
try {
|
||||
closeable.close();
|
||||
} catch (Exception e) {
|
||||
@ -392,7 +394,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
|
||||
if (threadSpec.isPresent()) {
|
||||
String spec = threadSpec.get();
|
||||
int processors = Runtime.getRuntime().availableProcessors();
|
||||
if (spec.toLowerCase().equals("auto")) {
|
||||
if (spec.equalsIgnoreCase("auto")) {
|
||||
int threads = processors * 10;
|
||||
if (threads > activityDef.getCycleCount()) {
|
||||
threads = (int) activityDef.getCycleCount();
|
||||
@ -652,7 +654,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
|
||||
return stmtsDocList;
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new OpConfigError("Error loading op templates: " + e.toString(), workloadSource, e);
|
||||
throw new OpConfigError("Error loading op templates: " + e, workloadSource, e);
|
||||
}
|
||||
|
||||
}
|
||||
@ -677,6 +679,11 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
|
||||
return getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
|
||||
}
|
||||
|
||||
@Override
|
||||
public RunStateTally getRunStateTally() {
|
||||
return tally;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
|
@ -0,0 +1,71 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.engine.api.activityimpl.motor;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.RunState;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
/**
|
||||
* A value type which encodes the atomic state of a RunState tally.
|
||||
*/
|
||||
public class RunStateImage {
|
||||
private final static Logger logger = LogManager.getLogger("TALLY");
|
||||
|
||||
private final int[] counts = new int[RunState.values().length];
|
||||
private final boolean timedout;
|
||||
|
||||
public RunStateImage(int[] counts, boolean timedout) {
|
||||
System.arraycopy(counts, 0, this.counts, 0, counts.length);
|
||||
this.timedout = timedout;
|
||||
}
|
||||
|
||||
public boolean isTimeout() {
|
||||
return this.timedout;
|
||||
}
|
||||
|
||||
public boolean is(RunState runState) {
|
||||
return counts[runState.ordinal()]>0;
|
||||
}
|
||||
|
||||
public boolean isOnly(RunState runState) {
|
||||
for (int i = 0; i < counts.length; i++) {
|
||||
if (counts[i]>0 && i!=runState.ordinal()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public RunState getMaxState() {
|
||||
for (int ord = counts.length-1; ord >= 0; ord--) {
|
||||
if (counts[ord]>0) {
|
||||
return RunState.values()[ord];
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("There were zero states, so max state is undefined");
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (RunState runState : RunState.values()) {
|
||||
sb.append(runState.getCode()).append(" ").append(counts[runState.ordinal()]).append(" ");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,266 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.engine.api.activityimpl.motor;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.RunState;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
/**
|
||||
* <H2>Synopsis</H2>
|
||||
* <P>Event-oriented tally of the runtime states of all
|
||||
* motor threads. This is meant to be used as the definitive
|
||||
* aggregate scorecard for an activity's thread states.</P>
|
||||
* <HR></HR>
|
||||
* <H2>Purpose</H2>
|
||||
* <P>This limits inter-thread signaling requirements by constraining
|
||||
* the cases for which blockers are notified. This is because the
|
||||
* meaningful scenarios for which a blocker would want to be notified
|
||||
* do not include changes of positive values to other positive values.
|
||||
* This allows an explicit optimization around scenarios for which a
|
||||
* state count increments to 1 or decrements to 0, as in "some" or "none".
|
||||
* This is an effective optimization for scenarios with many active
|
||||
* threads.
|
||||
* </P>
|
||||
* <HR></HR>
|
||||
* <H2>Calling Semantics</H2>
|
||||
* <P>Callers of the await functions will block for the required condition or,
|
||||
* if specified, the timeout to occur without the condition.
|
||||
* These callers are unblocked atomically, after any state add, state remove,
|
||||
* or state change events are fully completed.</P>
|
||||
* <P>{@link RunStateImage} is returned from blocking methods so that
|
||||
* callers can know consistently what the current run states were at
|
||||
* the time their condition was met or timed out. Any callers of such
|
||||
* methods <EM>must</EM> check for whether the condition was successfully
|
||||
* met via {@link RunStateImage#isTimeout()}</P>
|
||||
* <HR></HR>
|
||||
* <H2>Invariants</H2>
|
||||
* <UL>
|
||||
* <LI>Under consistent usage patterns, all counts should be zero or positive at all times.</LI>
|
||||
* <LI>The current count for each state should be equal to the visible {@link RunState} on each
|
||||
* motor thread, as all transitions are made through the motor threads directly.</LI>
|
||||
* </UL>
|
||||
*/
|
||||
public class RunStateTally {
|
||||
private final static Logger logger = LogManager.getLogger("TALLY");
|
||||
|
||||
/**
|
||||
* If no timeout is given for any of the await methods, then the default is to wait for
|
||||
* approximately many eons. Some tests run until told to stop.
|
||||
*/
|
||||
public final long DEFAULT_TIMEOUT_MS=Long.MAX_VALUE;
|
||||
|
||||
private final int[] counts = new int[RunState.values().length];
|
||||
|
||||
/**
|
||||
* @return the current count for the specified state
|
||||
* @param runState The {@link RunState} to count
|
||||
*
|
||||
*/
|
||||
public synchronized int tallyFor(RunState runState) {
|
||||
return counts[runState.ordinal()];
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal that a motor thread has left one {@link RunState} and entered another,
|
||||
* atomically. After both counts are updated, if any RunState counts changed to or from zero,
|
||||
* then all potential observers are notified to re-evaluate their await conditions.
|
||||
* @param from the prior RunState
|
||||
* @param to the next RunState
|
||||
*/
|
||||
public synchronized void change(RunState from, RunState to) {
|
||||
counts[from.ordinal()]--;
|
||||
counts[to.ordinal()]++;
|
||||
logger.trace(() -> this +" -"+from+ String.format(":%04d",counts[from.ordinal()])+ ", +"+to+ String.format(":%04d",counts[to.ordinal()]));
|
||||
|
||||
if (counts[from.ordinal()]==0 || counts[to.ordinal()]==1) {
|
||||
logger.debug(() -> "NOTIFYing on edge "+
|
||||
"from " + from + String.format(":%04d",counts[from.ordinal()]) +
|
||||
" to " + to + String.format(":%04d",counts[to.ordinal()]));
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a previously untracked motor thread to state tracking with the provided {@link RunState}.
|
||||
* If the given state's count becomes non-zero, then all potential observers are notified to re-evaluate
|
||||
* their await conditions.
|
||||
* @param state The initial tracking state for the related motor thread
|
||||
*/
|
||||
public synchronized void add(RunState state) {
|
||||
counts[state.ordinal()]++;
|
||||
logger.trace(() -> this +" +"+state+ String.format(":%04d",counts[state.ordinal()]));
|
||||
if (counts[state.ordinal()]==1) {
|
||||
logger.debug(() -> "NOTIFYing on ++-SOME edge for " + state + String.format(":%04d",counts[state.ordinal()]));
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a previously tracked motor thread from state tracking with the provided {@link RunState}.
|
||||
* If the given state's count becomes zero, then all potential observers are notified to re-evaluate
|
||||
* their await conditions.
|
||||
* @param state The final tracking state for the related motor thread
|
||||
*/
|
||||
public synchronized void remove(RunState state) {
|
||||
counts[state.ordinal()]--;
|
||||
logger.trace(() -> this +" -"+state+ String.format(":%04d",counts[state.ordinal()]));
|
||||
if (counts[state.ordinal()]==0) {
|
||||
logger.debug(() -> "NOTIFYing on 00-NONE edge for " + state + String.format(":%04d",counts[state.ordinal()]));
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Await until all states but the provided {@link RunState}s have zero counts.
|
||||
* This condition matches if the provided states have zero or positive counts <EM>if and only if</EM>
|
||||
* all other states have zero counts. Thus, if there are no positive values at all, this condition will
|
||||
* still match.
|
||||
* @param runStates The states which <EM>may</EM> have zero counts and still match the condition
|
||||
* @return A {@link RunStateImage}, indicating success or failure, and the view of states at the time of evaluation
|
||||
*/
|
||||
public synchronized RunStateImage awaitNoneOther(RunState... runStates) {
|
||||
return this.awaitNoneOther(DEFAULT_TIMEOUT_MS, runStates);
|
||||
}
|
||||
/**
|
||||
* Exactly like {@link #awaitNoneOther(long, RunState...)}, except that it allows for a timeout,
|
||||
* after which the method will unblock and signal an error if the await condition is still false.
|
||||
* @param runStates RunStates which are the only valid states before unblocking
|
||||
* @return A {@link RunStateImage}, indicating success or failure, and the view of states at the time of evaluation
|
||||
*/
|
||||
public synchronized RunStateImage awaitNoneOther(long timeoutMillis, RunState... runStates) {
|
||||
logger.debug(() -> "☐ Awaiting only " + Arrays.toString(runStates) + " for " + timeoutMillis+"ms");
|
||||
long timeoutAt = timeoutAt(timeoutMillis);
|
||||
|
||||
int sum=0;
|
||||
for (RunState runState: RunState.values()) {
|
||||
sum+=counts[runState.ordinal()];
|
||||
}
|
||||
for (RunState runState : runStates) {
|
||||
sum-=counts[runState.ordinal()];
|
||||
}
|
||||
while (sum>0 && System.currentTimeMillis()<timeoutAt) {
|
||||
try {
|
||||
wait(timeoutAt-System.currentTimeMillis());
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
|
||||
sum=0;
|
||||
for (RunState runState: RunState.values()) {
|
||||
sum+=counts[runState.ordinal()];
|
||||
}
|
||||
for (RunState runState : runStates) {
|
||||
sum-=counts[runState.ordinal()];
|
||||
}
|
||||
}
|
||||
|
||||
boolean timedout = (sum!=0);
|
||||
logger.debug(() -> (timedout ? "✘ TIMED-OUT awaiting only " : "☑ Awaited only " ) + toString(runStates));
|
||||
return new RunStateImage(this.counts,timedout);
|
||||
}
|
||||
|
||||
private long timeoutAt(long timeoutMillis) {
|
||||
long delayTill= System.currentTimeMillis() + timeoutMillis;
|
||||
return (delayTill>0) ? delayTill : Long.MAX_VALUE;
|
||||
}
|
||||
/**
|
||||
* Await until there are zero counts for all of the specified {@link RunState}s.
|
||||
* @param runStates all RunStates which must be zeroed before unblocking
|
||||
* @return A {@link RunStateImage}, indicating success or failure, and the view of states at the time of evaluation
|
||||
*/
|
||||
public synchronized RunStateImage awaitNoneOf(RunState... runStates) {
|
||||
return this.awaitNoneOf(DEFAULT_TIMEOUT_MS, runStates);
|
||||
}
|
||||
/**
|
||||
* Exactly like {@link #awaitNoneOf(RunState...)}, except that it allows for a timeout, after which the
|
||||
* method will unblock and signal a timeout if the await condition is still not met.
|
||||
* @param runStates all RunStates which must be zeroed before unblocking
|
||||
* @return A {@link RunStateImage}, indicating success or failure, and the view of states at the time of evaluation
|
||||
*/
|
||||
public synchronized RunStateImage awaitNoneOf(long timeoutMillis, RunState... runStates) {
|
||||
logger.debug(() -> "☐ Awaiting none of " + Arrays.toString(runStates)+ " for " + timeoutMillis+"ms");
|
||||
long timeoutAt = timeoutAt(timeoutMillis);
|
||||
|
||||
int sum=0;
|
||||
for (RunState runState : runStates) {
|
||||
sum+=counts[runState.ordinal()];
|
||||
}
|
||||
while (sum>0 && System.currentTimeMillis()<timeoutAt) {
|
||||
try {
|
||||
wait(timeoutAt-System.currentTimeMillis());
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
sum=0;
|
||||
for (RunState runState : runStates) {
|
||||
sum+=counts[runState.ordinal()];
|
||||
}
|
||||
}
|
||||
boolean timedout=sum==0;
|
||||
logger.debug(() -> (timedout ? "✘ TIMED-OUT awaiting none of " : "☑ Awaited none of " ) + toString(runStates));
|
||||
return new RunStateImage(this.counts,timedout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Await until at least one of the provided runStates has a positive value.
|
||||
* @param runStates RunStates any of which allow unblocking
|
||||
* @return A {@link RunStateImage}, indicating success or failure, and the view of states at the time of evaluation
|
||||
*/
|
||||
public synchronized RunStateImage awaitAny(RunState... runStates) {
|
||||
return this.awaitAny(DEFAULT_TIMEOUT_MS,runStates);
|
||||
}
|
||||
|
||||
/**
|
||||
* Exactly like {@link #awaitAny(RunState...)}, except that it allows for a timeout, after which
|
||||
* the method will unblock and signal a timeout if the await condition is still not met.
|
||||
* @param runStates RunStates any of which allow unblocking
|
||||
* @param timeoutMillis Milliseconds to wait for any of the runstates before giving up
|
||||
* @return A {@link RunStateImage}, indicating success or failure, and the view of states at the time of evaluation
|
||||
*/
|
||||
public synchronized RunStateImage awaitAny(long timeoutMillis, RunState... runStates) {
|
||||
logger.debug(() -> "☐ Awaiting any " + Arrays.toString(runStates) + " for " + timeoutMillis + "ms");
|
||||
long timeoutAt = timeoutAt(timeoutMillis);
|
||||
|
||||
while (System.currentTimeMillis()<timeoutAt) {
|
||||
for (RunState runState : runStates) {
|
||||
if (counts[runState.ordinal()]>0) {
|
||||
logger.debug("☑ Awaited any " + toString(runStates));
|
||||
return new RunStateImage(this.counts,false);
|
||||
}
|
||||
}
|
||||
try {
|
||||
wait(timeoutAt-System.currentTimeMillis());
|
||||
} catch (InterruptedException e) {
|
||||
}
|
||||
}
|
||||
logger.debug(() -> "✘ TIMED-OUT awaiting any of " + toString(runStates));
|
||||
return new RunStateImage(this.counts,true);
|
||||
}
|
||||
|
||||
public String toString(RunState... runStates) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (RunState runState : runStates) {
|
||||
sb.append(runState.getCode()).append("(").append(counts[runState.ordinal()]).append(") ");
|
||||
}
|
||||
sb.setLength(sb.length()-1);
|
||||
return sb.toString();
|
||||
}
|
||||
public String toString() {
|
||||
return toString(RunState.values());
|
||||
}
|
||||
}
|
@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.engine.api.activityimpl.motor;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.RunState;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class RunStateImageTest {
|
||||
|
||||
@Test
|
||||
public void testMaxStateImage() {
|
||||
int[] counts = new int[RunState.values().length];
|
||||
counts[RunState.Running.ordinal()]=3;
|
||||
RunStateImage image = new RunStateImage(counts, false);
|
||||
assertThat(image.is(RunState.Running)).isTrue();
|
||||
assertThat(image.isTimeout()).isFalse();
|
||||
assertThat(image.is(RunState.Errored)).isFalse();
|
||||
assertThat(image.isOnly(RunState.Running)).isTrue();
|
||||
RunState maxState = image.getMaxState();
|
||||
assertThat(maxState).isEqualTo(RunState.values()[2]);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,214 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.engine.api.activityimpl.motor;
|
||||
|
||||
import io.nosqlbench.engine.api.activityapi.core.RunState;
|
||||
import org.junit.jupiter.api.*;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
|
||||
public class RunStateTallyTest {
|
||||
|
||||
volatile boolean awaited = false;
|
||||
volatile RunStateImage event = null;
|
||||
|
||||
@BeforeEach
|
||||
public void setup() {
|
||||
awaited = false;
|
||||
event = null;
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(1)
|
||||
public void testAwaitAny() {
|
||||
Thread.currentThread().setName("SETTER");
|
||||
|
||||
RunStateTally tally = new RunStateTally();
|
||||
awaited = false;
|
||||
Thread waiter = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
event = tally.awaitAny(RunState.Running);
|
||||
awaited = true;
|
||||
}
|
||||
});
|
||||
waiter.setName("WAITER");
|
||||
waiter.setDaemon(true);
|
||||
waiter.start();
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
assertThat(awaited).isFalse();
|
||||
tally.add(RunState.Running);
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
|
||||
assertThat(event.is(RunState.Running)).isTrue();
|
||||
assertThat(event.isOnly(RunState.Running)).isTrue();
|
||||
|
||||
assertThat(awaited).isTrue();
|
||||
assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(2)
|
||||
public void testAwaitNoneOf() {
|
||||
Thread.currentThread().setName("SETTER");
|
||||
|
||||
RunStateTally tally = new RunStateTally();
|
||||
tally.add(RunState.Uninitialized);
|
||||
tally.add(RunState.Stopped);
|
||||
awaited = false;
|
||||
Thread waiter = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
tally.awaitNoneOf(RunState.Stopped, RunState.Uninitialized);
|
||||
awaited = true;
|
||||
}
|
||||
});
|
||||
waiter.setName("WAITER");
|
||||
waiter.setDaemon(true);
|
||||
waiter.start();
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
assertThat(awaited).isFalse();
|
||||
tally.change(RunState.Stopped, RunState.Finished);
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
assertThat(awaited).isFalse();
|
||||
tally.change(RunState.Uninitialized, RunState.Finished);
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
assertThat(awaited).isTrue();
|
||||
assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(3)
|
||||
public void testAwaitNoneOther() {
|
||||
Thread.currentThread().setName("SETTER");
|
||||
|
||||
RunStateTally tally = new RunStateTally();
|
||||
tally.add(RunState.Uninitialized);
|
||||
tally.add(RunState.Running);
|
||||
awaited = false;
|
||||
Thread waiter = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
event = tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
|
||||
awaited = true;
|
||||
}
|
||||
});
|
||||
waiter.setName("WAITER");
|
||||
waiter.setDaemon(true);
|
||||
waiter.start();
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
assertThat(awaited).isFalse();
|
||||
tally.change(RunState.Uninitialized, RunState.Finished);
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
assertThat(awaited).isFalse();
|
||||
|
||||
// Note that neither Stopped or Finished are required to be positive,
|
||||
// as long as all others are zero total.
|
||||
tally.remove(RunState.Running);
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
assertThat(awaited).isTrue();
|
||||
assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Order(4)
|
||||
public void testAwaitNoneOtherTimedOut() {
|
||||
Thread.currentThread().setName("SETTER");
|
||||
|
||||
RunStateTally tally = new RunStateTally();
|
||||
tally.add(RunState.Uninitialized);
|
||||
tally.add(RunState.Running);
|
||||
Thread waiter = new Thread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
event = tally.awaitNoneOther(1500, RunState.Stopped, RunState.Finished);
|
||||
awaited = true;
|
||||
}
|
||||
});
|
||||
waiter.setName("WAITER");
|
||||
waiter.setDaemon(true);
|
||||
waiter.start();
|
||||
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
assertThat(awaited).isFalse();
|
||||
tally.change(RunState.Uninitialized, RunState.Finished);
|
||||
|
||||
try {
|
||||
Thread.sleep(1500);
|
||||
} catch (Exception e) {
|
||||
}
|
||||
|
||||
// try {
|
||||
// waiter.join();
|
||||
// } catch (InterruptedException e) {
|
||||
// throw new RuntimeException(e);
|
||||
// }
|
||||
|
||||
assertThat(event.isOnly(RunState.Errored)).isFalse();
|
||||
assertThat(waiter.getState()).isNotEqualTo(Thread.State.RUNNABLE);
|
||||
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -38,7 +38,6 @@ public class CoreMotorTest {
|
||||
Motor cm = new CoreMotor(activity, 5L, lockstepper);
|
||||
AtomicLong observableAction = new AtomicLong(-3L);
|
||||
cm.setAction(getTestConsumer(observableAction));
|
||||
cm.getSlotStateTracker().enterState(RunState.Starting);
|
||||
Thread t = new Thread(cm);
|
||||
t.setName("TestMotor");
|
||||
t.start();
|
||||
@ -58,7 +57,6 @@ public class CoreMotorTest {
|
||||
AtomicLongArray ary = new AtomicLongArray(10);
|
||||
Action a1 = getTestArrayConsumer(ary);
|
||||
cm1.setAction(a1);
|
||||
cm1.getSlotStateTracker().enterState(RunState.Starting);
|
||||
|
||||
Thread t1 = new Thread(cm1);
|
||||
t1.setName("cm1");
|
||||
|
Loading…
Reference in New Issue
Block a user