Merge pull request #920 from nosqlbench/jeffb/test-fail-fix

Sporadic test failure - fix
This commit is contained in:
Jonathan Shook
2023-01-17 11:54:50 -06:00
committed by GitHub
4 changed files with 153 additions and 133 deletions

View File

@@ -31,6 +31,11 @@ jobs:
- name: Installing dependencies - name: Installing dependencies
run: mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true -B -V run: mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true -B -V
- name: Running tests
run: mvn -B test
- name: Collecting reports
run: tar -cvf codecov-report.tar target/coverage-report/**/*
- name: Uploading [nosqlbench] test coverage - name: Uploading [nosqlbench] test coverage
uses: actions/upload-artifact@v3 uses: actions/upload-artifact@v3

View File

@@ -48,7 +48,6 @@ import java.util.stream.Collectors;
* <P>Some basic rules and invariants must be observed for consistent concurrent behavior. * <P>Some basic rules and invariants must be observed for consistent concurrent behavior.
* Any state changes for a Motor must be made through {@link Motor#getState()}. * Any state changes for a Motor must be made through {@link Motor#getState()}.
* This allows the state tracking to work consistently for all observers.</p> * This allows the state tracking to work consistently for all observers.</p>
*
*/ */
public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> { public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> {
@@ -91,20 +90,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
tally.awaitNoneOther(RunState.Stopped, RunState.Finished); tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
shutdownExecutorService(Integer.MAX_VALUE); shutdownExecutorService(Integer.MAX_VALUE);
tally.awaitNoneOther(RunState.Stopped,RunState.Finished); tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
activity.setRunState(RunState.Stopped); activity.setRunState(RunState.Stopped);
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots"); logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder() Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId) .session(sessionId)
.interval(this.startedAt, this.stoppedAt) .interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity) .layer(Layer.Activity)
.label("alias", getActivityDef().getAlias()) .label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType()) .label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString()) .detail("params", getActivityDef().toString())
.build() .build()
); );
} }
@@ -118,20 +117,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
motors.forEach(Motor::requestStop); motors.forEach(Motor::requestStop);
shutdownExecutorService(Integer.MAX_VALUE); shutdownExecutorService(Integer.MAX_VALUE);
tally.awaitNoneOther(RunState.Stopped,RunState.Finished); tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
activity.setRunState(RunState.Stopped); activity.setRunState(RunState.Stopped);
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots"); logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder() Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId) .session(sessionId)
.interval(this.startedAt, this.stoppedAt) .interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity) .layer(Layer.Activity)
.label("alias", getActivityDef().getAlias()) .label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType()) .label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString()) .detail("params", getActivityDef().toString())
.build() .build()
); );
} }
@@ -211,10 +210,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
adjustMotorCountToThreadParam(activity.getActivityDef()); adjustMotorCountToThreadParam(activity.getActivityDef());
} }
motors.stream() motors.stream()
.filter(m -> (m instanceof ActivityDefObserver)) .filter(m -> (m instanceof ActivityDefObserver))
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized) // .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting) // .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef)); .forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
} }
} }
@@ -228,8 +227,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private String getSlotStatus() { private String getSlotStatus() {
return motors.stream() return motors.stream()
.map(m -> m.getState().get().getCode()) .map(m -> m.getState().get().getCode())
.collect(Collectors.joining(",", "[", "]")); .collect(Collectors.joining(",", "[", "]"));
} }
/** /**
@@ -286,17 +285,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
case Running: case Running:
case Starting: case Starting:
motors.stream() motors.stream()
.filter(m -> m.getState().get() != RunState.Running) .filter(m -> m.getState().get() != RunState.Running)
.filter(m -> m.getState().get() != RunState.Finished) .filter(m -> m.getState().get() != RunState.Finished)
.filter(m -> m.getState().get() != RunState.Starting) .filter(m -> m.getState().get() != RunState.Starting)
.forEach(m -> { .forEach(m -> {
executorService.execute(m); executorService.execute(m);
}); });
break; break;
case Stopped: case Stopped:
motors.stream() motors.stream()
.filter(m -> m.getState().get() != RunState.Stopped) .filter(m -> m.getState().get() != RunState.Stopped)
.forEach(Motor::requestStop); .forEach(Motor::requestStop);
break; break;
case Finished: case Finished:
case Stopping: case Stopping:
@@ -309,7 +308,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private void awaitAlignmentOfMotorStateToActivityState() { private void awaitAlignmentOfMotorStateToActivityState() {
logger.debug(()->"awaiting state alignment from " + activity.getRunState()); logger.debug(() -> "awaiting state alignment from " + activity.getRunState());
switch (activity.getRunState()) { switch (activity.getRunState()) {
case Starting: case Starting:
case Running: case Running:
@@ -412,7 +411,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private void awaitMotorsAtLeastRunning() { private void awaitMotorsAtLeastRunning() {
RunStateImage states = tally.awaitAny(RunState.Running, RunState.Stopped, RunState.Finished, RunState.Errored); RunStateImage states = tally.awaitAny(RunState.Running, RunState.Stopped, RunState.Finished, RunState.Errored);
RunState maxState = states.getMaxState(); RunState maxState = states.getMaxState();
if (maxState==RunState.Errored) { if (maxState == RunState.Errored) {
activity.setRunState(maxState); activity.setRunState(maxState);
throw new RuntimeException("Error in activity"); throw new RuntimeException("Error in activity");
} }
@@ -463,17 +462,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
RunStateImage state = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored); RunStateImage state = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
RunState maxState = state.getMaxState(); RunState maxState = state.getMaxState();
activity.setRunState(maxState); activity.setRunState(maxState);
if (maxState==RunState.Errored) { if (maxState == RunState.Errored) {
throw new RuntimeException("Error while waiting for activity completion:" + this.exception); throw new RuntimeException("Error while waiting for activity completion:" + this.exception);
} }
} }
private void startMotorExecutorService() { private void startMotorExecutorService() {
this.executorService = new ThreadPoolExecutor( this.executorService = new ThreadPoolExecutor(
0, Integer.MAX_VALUE, 0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS, 0L, TimeUnit.SECONDS,
new SynchronousQueue<>(), new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this)) new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
); );
} }
@@ -490,14 +489,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary()); logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
Annotators.recordAnnotation(Annotation.newBuilder() Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId) .session(sessionId)
.now() .now()
.layer(Layer.Activity) .layer(Layer.Activity)
.label("alias", getActivityDef().getAlias()) .label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType()) .label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString()) .detail("params", getActivityDef().toString())
.build() .build()
); );
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")"); activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");
@@ -508,11 +507,11 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
activity.onActivityDefUpdate(activityDef); activity.onActivityDefUpdate(activityDef);
} catch (Exception e) { } catch (Exception e) {
this.exception = new RuntimeException("Error initializing activity '" + activity.getAlias() + "':\n" + e.getMessage(), e); this.exception = new RuntimeException("Error initializing activity '" + activity.getAlias() + "':\n" + e.getMessage(), e);
activitylogger.error(()->"error initializing activity '" + activity.getAlias() + "': " + exception); activitylogger.error(() -> "error initializing activity '" + activity.getAlias() + "': " + exception);
throw new RuntimeException(exception); throw new RuntimeException(exception);
} }
adjustMotorCountToThreadParam(activity.getActivityDef()); adjustMotorCountToThreadParam(activity.getActivityDef());
tally.awaitAny(RunState.Running,RunState.Finished,RunState.Stopped); tally.awaitAny(RunState.Running, RunState.Finished, RunState.Stopped);
activity.setRunState(RunState.Running); activity.setRunState(RunState.Running);
activitylogger.debug("START/after alias=(" + activity.getAlias() + ")"); activitylogger.debug("START/after alias=(" + activity.getAlias() + ")");
} }

View File

@@ -35,136 +35,152 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
public class ActivityExecutorTest { class ActivityExecutorTest {
private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class); private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class);
@Test @Test
public synchronized void testRestart() { synchronized void testRestart() {
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;op=initdelay:initdelay=5000;"); ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;op=initdelay:initdelay=5000;");
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad); new ActivityTypeLoader().load(activityDef);
Activity a = new DelayedInitActivity(ad);
InputDispenser idisp = new CoreInputDispenser(a); final Activity activity = new DelayedInitActivity(activityDef);
ActionDispenser adisp = new CoreActionDispenser(a); InputDispenser inputDispenser = new CoreInputDispenser(activity);
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null); ActionDispenser adisp = new CoreActionDispenser(activity);
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp); OutputDispenser tdisp = CoreServices.getOutputDispenser(activity).orElse(null);
a.setActionDispenserDelegate(adisp);
a.setOutputDispenserDelegate(tdisp); final MotorDispenser<?> mdisp = new CoreMotorDispenser(activity, inputDispenser, adisp, tdisp);
a.setInputDispenserDelegate(idisp); activity.setActionDispenserDelegate(adisp);
a.setMotorDispenserDelegate(mdisp); activity.setOutputDispenserDelegate(tdisp);
ExecutorService executor = Executors.newCachedThreadPool(); activity.setInputDispenserDelegate(inputDispenser);
ActivityExecutor ae = new ActivityExecutor(a, "test-restart"); activity.setMotorDispenserDelegate(mdisp);
Future<ExecutionResult> future = executor.submit(ae);
final ExecutorService executor = Executors.newCachedThreadPool();
ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-restart");
final Future<ExecutionResult> future = executor.submit(activityExecutor);
try { try {
ad.setThreads(1); activityDef.setThreads(1);
ae.startActivity(); activityExecutor.startActivity();
ae.stopActivity(); activityExecutor.stopActivity();
ae.startActivity(); activityExecutor.startActivity();
ae.startActivity(); activityExecutor.startActivity();
ExecutionResult executionResult = future.get(); future.get();
Thread.sleep(500L); Thread.sleep(500L);
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
System.out.print("ad.setThreads(1)");
executor.shutdown(); executor.shutdown();
assertThat(idisp.getInput(10).getInputSegment(3)).isNull(); assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
} }
@Test @Test
public synchronized void testDelayedStartSanity() { synchronized void testDelayedStartSanity() {
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad); final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
Activity a = new DelayedInitActivity(ad); new ActivityTypeLoader().load(activityDef);
InputDispenser idisp = new CoreInputDispenser(a);
ActionDispenser adisp = new CoreActionDispenser(a); final Activity activity = new DelayedInitActivity(activityDef);
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null); InputDispenser inputDispenser = new CoreInputDispenser(activity);
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp); ActionDispenser actionDispenser = new CoreActionDispenser(activity);
a.setActionDispenserDelegate(adisp); OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
a.setOutputDispenserDelegate(tdisp);
a.setInputDispenserDelegate(idisp); final MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
a.setMotorDispenserDelegate(mdisp); activity.setActionDispenserDelegate(actionDispenser);
activity.setOutputDispenserDelegate(outputDispenser);
activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(motorDispenser);
final ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
final ExecutorService testExecutor = Executors.newCachedThreadPool();
final Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
ActivityExecutor ae = new ActivityExecutor(a, "test-delayed-start");
ExecutorService testExecutor = Executors.newCachedThreadPool();
Future<ExecutionResult> future = testExecutor.submit(ae);
try { try {
ad.setThreads(1); activityDef.setThreads(1);
ae.startActivity(); activityExecutor.startActivity();
ExecutionResult result = future.get(); future.get();
} catch (InterruptedException e) { testExecutor.shutdownNow();
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
testExecutor.shutdownNow();
assertThat(idisp.getInput(10).getInputSegment(3)).isNull();
} catch (Exception e) {
fail("Unexpected exception", e);
}
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
} }
@Test @Test
public synchronized void testNewActivityExecutor() { synchronized void testNewActivityExecutor() {
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;");
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
Input longSupplier = new AtomicInput(ad);
MotorDispenser<?> cmf = getActivityMotorFactory(
ad, motorActionDelay(999), longSupplier
);
Activity a = new SimpleActivity(ad);
InputDispenser idisp = new CoreInputDispenser(a);
ActionDispenser adisp = new CoreActionDispenser(a);
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
a.setActionDispenserDelegate(adisp);
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
ActivityExecutor ae = new ActivityExecutor(a, "test-new-executor"); ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
ad.setThreads(5); new ActivityTypeLoader().load(activityDef);
ae.startActivity();
getActivityMotorFactory(motorActionDelay(999), new AtomicInput(activityDef));
final Activity simpleActivity = new SimpleActivity(activityDef);
InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null);
final MotorDispenser<?> motorDispenser = new CoreMotorDispenser<>(simpleActivity,
inputDispenser, actionDispenser, outputDispenser);
simpleActivity.setActionDispenserDelegate(actionDispenser);
simpleActivity.setInputDispenserDelegate(inputDispenser);
simpleActivity.setMotorDispenserDelegate(motorDispenser);
final ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor");
activityDef.setThreads(5);
activityExecutor.startActivity();
int[] speeds = new int[]{1, 2000, 5, 2000, 2, 2000}; int[] speeds = new int[]{1, 2000, 5, 2000, 2, 2000};
for (int offset = 0; offset < speeds.length; offset += 2) { for (int offset = 0; offset < speeds.length; offset += 2) {
int threadTarget = speeds[offset]; int threadTarget = speeds[offset];
int threadTime = speeds[offset + 1]; int threadTime = speeds[offset + 1];
logger.info(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
ad.setThreads(threadTarget); logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
activityDef.setThreads(threadTarget);
try { try {
Thread.sleep(threadTime); Thread.sleep(threadTime);
} catch (InterruptedException ignored) { } catch (Exception e) {
fail("Not expecting exception", e);
} }
} }
ad.setThreads(0);
// Used for slowing the roll due to state transitions in test.
try {
activityExecutor.stopActivity();
Thread.sleep(2000L);
} catch (Exception e) {
fail("Not expecting exception", e);
}
} }
private MotorDispenser<?> getActivityMotorFactory(final ActivityDef ad, Action lc, final Input ls) { private MotorDispenser<?> getActivityMotorFactory(Action lc, final Input ls) {
MotorDispenser<?> cmf = new MotorDispenser<>() { return new MotorDispenser<>() {
@Override @Override
public Motor getMotor(ActivityDef activityDef, int slotId) { public Motor getMotor(ActivityDef activityDef, int slotId) {
Activity activity = new SimpleActivity(activityDef); Activity activity = new SimpleActivity(activityDef);
Motor<?> cm = new CoreMotor(activity, slotId, ls); Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
cm.setAction(lc); cm.setAction(lc);
return cm; return cm;
} }
}; };
return cmf;
} }
private SyncAction motorActionDelay(final long delay) { private SyncAction motorActionDelay(final long delay) {
SyncAction consumer = new SyncAction() { return new SyncAction() {
@Override @Override
public int runCycle(long cycle) { public int runCycle(long cycle) {
System.out.println("consuming " + cycle + ", delaying:" + delay); logger.info(() -> "consuming " + cycle + ", delaying:" + delay);
try { try {
Thread.sleep(delay); Thread.sleep(delay);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
@@ -172,7 +188,7 @@ public class ActivityExecutorTest {
return 0; return 0;
} }
}; };
return consumer;
} }
private static class DelayedInitActivity extends SimpleActivity { private static class DelayedInitActivity extends SimpleActivity {
@@ -184,13 +200,13 @@ public class ActivityExecutorTest {
@Override @Override
public void initActivity() { public void initActivity() {
Integer initdelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0); Integer initDelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
logger.info(() -> "delaying for " + initdelay); logger.info(() -> "delaying for " + initDelay);
try { try {
Thread.sleep(initdelay); Thread.sleep(initDelay);
} catch (InterruptedException ignored) { } catch (InterruptedException ignored) {
} }
logger.info(() -> "delayed for " + initdelay); logger.info(() -> "delayed for " + initDelay);
} }
} }
} }

View File

@@ -75,7 +75,7 @@ public class ActivityDef implements NBNamedElement {
ActivityDef activityDef = new ActivityDef(activityParameterMap.orElseThrow( ActivityDef activityDef = new ActivityDef(activityParameterMap.orElseThrow(
() -> new RuntimeException("Unable to parse:" + namedActivitySpec) () -> new RuntimeException("Unable to parse:" + namedActivitySpec)
)); ));
logger.debug("parsed activityDef " + namedActivitySpec + " to-> " + activityDef); logger.info("parsed activityDef " + namedActivitySpec + " to-> " + activityDef);
return activityDef; return activityDef;
} }