Merge pull request #920 from nosqlbench/jeffb/test-fail-fix

Sporadic test failure - fix
This commit is contained in:
Jonathan Shook
2023-01-17 11:54:50 -06:00
committed by GitHub
4 changed files with 153 additions and 133 deletions

View File

@@ -31,6 +31,11 @@ jobs:
- name: Installing dependencies
run: mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true -B -V
- name: Running tests
run: mvn -B test
- name: Collecting reports
run: tar -cvf codecov-report.tar target/coverage-report/**/*
- name: Uploading [nosqlbench] test coverage
uses: actions/upload-artifact@v3

View File

@@ -48,7 +48,6 @@ import java.util.stream.Collectors;
* <P>Some basic rules and invariants must be observed for consistent concurrent behavior.
* Any state changes for a Motor must be made through {@link Motor#getState()}.
* This allows the state tracking to work consistently for all observers.</p>
*
*/
public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> {

View File

@@ -35,136 +35,152 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
public class ActivityExecutorTest {
class ActivityExecutorTest {
private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class);
@Test
public synchronized void testRestart() {
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;op=initdelay:initdelay=5000;");
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
Activity a = new DelayedInitActivity(ad);
InputDispenser idisp = new CoreInputDispenser(a);
ActionDispenser adisp = new CoreActionDispenser(a);
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
a.setActionDispenserDelegate(adisp);
a.setOutputDispenserDelegate(tdisp);
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
ExecutorService executor = Executors.newCachedThreadPool();
ActivityExecutor ae = new ActivityExecutor(a, "test-restart");
Future<ExecutionResult> future = executor.submit(ae);
synchronized void testRestart() {
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;op=initdelay:initdelay=5000;");
new ActivityTypeLoader().load(activityDef);
final Activity activity = new DelayedInitActivity(activityDef);
InputDispenser inputDispenser = new CoreInputDispenser(activity);
ActionDispenser adisp = new CoreActionDispenser(activity);
OutputDispenser tdisp = CoreServices.getOutputDispenser(activity).orElse(null);
final MotorDispenser<?> mdisp = new CoreMotorDispenser(activity, inputDispenser, adisp, tdisp);
activity.setActionDispenserDelegate(adisp);
activity.setOutputDispenserDelegate(tdisp);
activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(mdisp);
final ExecutorService executor = Executors.newCachedThreadPool();
ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-restart");
final Future<ExecutionResult> future = executor.submit(activityExecutor);
try {
ad.setThreads(1);
ae.startActivity();
ae.stopActivity();
ae.startActivity();
ae.startActivity();
ExecutionResult executionResult = future.get();
activityDef.setThreads(1);
activityExecutor.startActivity();
activityExecutor.stopActivity();
activityExecutor.startActivity();
activityExecutor.startActivity();
future.get();
Thread.sleep(500L);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.print("ad.setThreads(1)");
executor.shutdown();
assertThat(idisp.getInput(10).getInputSegment(3)).isNull();
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
}
@Test
public synchronized void testDelayedStartSanity() {
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
Activity a = new DelayedInitActivity(ad);
InputDispenser idisp = new CoreInputDispenser(a);
ActionDispenser adisp = new CoreActionDispenser(a);
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
a.setActionDispenserDelegate(adisp);
a.setOutputDispenserDelegate(tdisp);
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
synchronized void testDelayedStartSanity() {
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef);
final Activity activity = new DelayedInitActivity(activityDef);
InputDispenser inputDispenser = new CoreInputDispenser(activity);
ActionDispenser actionDispenser = new CoreActionDispenser(activity);
OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
final MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
activity.setActionDispenserDelegate(actionDispenser);
activity.setOutputDispenserDelegate(outputDispenser);
activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(motorDispenser);
final ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
final ExecutorService testExecutor = Executors.newCachedThreadPool();
final Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
ActivityExecutor ae = new ActivityExecutor(a, "test-delayed-start");
ExecutorService testExecutor = Executors.newCachedThreadPool();
Future<ExecutionResult> future = testExecutor.submit(ae);
try {
ad.setThreads(1);
ae.startActivity();
ExecutionResult result = future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
activityDef.setThreads(1);
activityExecutor.startActivity();
future.get();
testExecutor.shutdownNow();
assertThat(idisp.getInput(10).getInputSegment(3)).isNull();
} catch (Exception e) {
fail("Unexpected exception", e);
}
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
}
@Test
public synchronized void testNewActivityExecutor() {
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;");
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
Input longSupplier = new AtomicInput(ad);
MotorDispenser<?> cmf = getActivityMotorFactory(
ad, motorActionDelay(999), longSupplier
);
Activity a = new SimpleActivity(ad);
InputDispenser idisp = new CoreInputDispenser(a);
ActionDispenser adisp = new CoreActionDispenser(a);
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
a.setActionDispenserDelegate(adisp);
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
synchronized void testNewActivityExecutor() {
ActivityExecutor ae = new ActivityExecutor(a, "test-new-executor");
ad.setThreads(5);
ae.startActivity();
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef);
getActivityMotorFactory(motorActionDelay(999), new AtomicInput(activityDef));
final Activity simpleActivity = new SimpleActivity(activityDef);
InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null);
final MotorDispenser<?> motorDispenser = new CoreMotorDispenser<>(simpleActivity,
inputDispenser, actionDispenser, outputDispenser);
simpleActivity.setActionDispenserDelegate(actionDispenser);
simpleActivity.setInputDispenserDelegate(inputDispenser);
simpleActivity.setMotorDispenserDelegate(motorDispenser);
final ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor");
activityDef.setThreads(5);
activityExecutor.startActivity();
int[] speeds = new int[]{1, 2000, 5, 2000, 2, 2000};
for (int offset = 0; offset < speeds.length; offset += 2) {
int threadTarget = speeds[offset];
int threadTime = speeds[offset + 1];
logger.info(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
ad.setThreads(threadTarget);
logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
activityDef.setThreads(threadTarget);
try {
Thread.sleep(threadTime);
} catch (InterruptedException ignored) {
} catch (Exception e) {
fail("Not expecting exception", e);
}
}
ad.setThreads(0);
}
private MotorDispenser<?> getActivityMotorFactory(final ActivityDef ad, Action lc, final Input ls) {
MotorDispenser<?> cmf = new MotorDispenser<>() {
// Used for slowing the roll due to state transitions in test.
try {
activityExecutor.stopActivity();
Thread.sleep(2000L);
} catch (Exception e) {
fail("Not expecting exception", e);
}
}
private MotorDispenser<?> getActivityMotorFactory(Action lc, final Input ls) {
return new MotorDispenser<>() {
@Override
public Motor getMotor(ActivityDef activityDef, int slotId) {
Activity activity = new SimpleActivity(activityDef);
Motor<?> cm = new CoreMotor(activity, slotId, ls);
Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
cm.setAction(lc);
return cm;
}
};
return cmf;
}
private SyncAction motorActionDelay(final long delay) {
SyncAction consumer = new SyncAction() {
return new SyncAction() {
@Override
public int runCycle(long cycle) {
System.out.println("consuming " + cycle + ", delaying:" + delay);
logger.info(() -> "consuming " + cycle + ", delaying:" + delay);
try {
Thread.sleep(delay);
} catch (InterruptedException ignored) {
@@ -172,7 +188,7 @@ public class ActivityExecutorTest {
return 0;
}
};
return consumer;
}
private static class DelayedInitActivity extends SimpleActivity {
@@ -184,13 +200,13 @@ public class ActivityExecutorTest {
@Override
public void initActivity() {
Integer initdelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
logger.info(() -> "delaying for " + initdelay);
Integer initDelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
logger.info(() -> "delaying for " + initDelay);
try {
Thread.sleep(initdelay);
Thread.sleep(initDelay);
} catch (InterruptedException ignored) {
}
logger.info(() -> "delayed for " + initdelay);
logger.info(() -> "delayed for " + initDelay);
}
}
}

View File

@@ -75,7 +75,7 @@ public class ActivityDef implements NBNamedElement {
ActivityDef activityDef = new ActivityDef(activityParameterMap.orElseThrow(
() -> new RuntimeException("Unable to parse:" + namedActivitySpec)
));
logger.debug("parsed activityDef " + namedActivitySpec + " to-> " + activityDef);
logger.info("parsed activityDef " + namedActivitySpec + " to-> " + activityDef);
return activityDef;
}