mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge pull request #920 from nosqlbench/jeffb/test-fail-fix
Sporadic test failure - fix
This commit is contained in:
5
.github/workflows/build.yml
vendored
5
.github/workflows/build.yml
vendored
@@ -31,6 +31,11 @@ jobs:
|
|||||||
- name: Installing dependencies
|
- name: Installing dependencies
|
||||||
run: mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true -B -V
|
run: mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true -B -V
|
||||||
|
|
||||||
|
- name: Running tests
|
||||||
|
run: mvn -B test
|
||||||
|
|
||||||
|
- name: Collecting reports
|
||||||
|
run: tar -cvf codecov-report.tar target/coverage-report/**/*
|
||||||
|
|
||||||
- name: Uploading [nosqlbench] test coverage
|
- name: Uploading [nosqlbench] test coverage
|
||||||
uses: actions/upload-artifact@v3
|
uses: actions/upload-artifact@v3
|
||||||
|
|||||||
@@ -48,7 +48,6 @@ import java.util.stream.Collectors;
|
|||||||
* <P>Some basic rules and invariants must be observed for consistent concurrent behavior.
|
* <P>Some basic rules and invariants must be observed for consistent concurrent behavior.
|
||||||
* Any state changes for a Motor must be made through {@link Motor#getState()}.
|
* Any state changes for a Motor must be made through {@link Motor#getState()}.
|
||||||
* This allows the state tracking to work consistently for all observers.</p>
|
* This allows the state tracking to work consistently for all observers.</p>
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> {
|
public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> {
|
||||||
@@ -91,20 +90,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
|
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
|
||||||
|
|
||||||
shutdownExecutorService(Integer.MAX_VALUE);
|
shutdownExecutorService(Integer.MAX_VALUE);
|
||||||
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);
|
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
|
||||||
activity.setRunState(RunState.Stopped);
|
activity.setRunState(RunState.Stopped);
|
||||||
|
|
||||||
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
|
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
|
||||||
|
|
||||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||||
.session(sessionId)
|
.session(sessionId)
|
||||||
.interval(this.startedAt, this.stoppedAt)
|
.interval(this.startedAt, this.stoppedAt)
|
||||||
.layer(Layer.Activity)
|
.layer(Layer.Activity)
|
||||||
.label("alias", getActivityDef().getAlias())
|
.label("alias", getActivityDef().getAlias())
|
||||||
.label("driver", getActivityDef().getActivityType())
|
.label("driver", getActivityDef().getActivityType())
|
||||||
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
||||||
.detail("params", getActivityDef().toString())
|
.detail("params", getActivityDef().toString())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -118,20 +117,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
motors.forEach(Motor::requestStop);
|
motors.forEach(Motor::requestStop);
|
||||||
|
|
||||||
shutdownExecutorService(Integer.MAX_VALUE);
|
shutdownExecutorService(Integer.MAX_VALUE);
|
||||||
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);
|
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
|
||||||
activity.setRunState(RunState.Stopped);
|
activity.setRunState(RunState.Stopped);
|
||||||
|
|
||||||
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
|
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
|
||||||
|
|
||||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||||
.session(sessionId)
|
.session(sessionId)
|
||||||
.interval(this.startedAt, this.stoppedAt)
|
.interval(this.startedAt, this.stoppedAt)
|
||||||
.layer(Layer.Activity)
|
.layer(Layer.Activity)
|
||||||
.label("alias", getActivityDef().getAlias())
|
.label("alias", getActivityDef().getAlias())
|
||||||
.label("driver", getActivityDef().getActivityType())
|
.label("driver", getActivityDef().getActivityType())
|
||||||
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
||||||
.detail("params", getActivityDef().toString())
|
.detail("params", getActivityDef().toString())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -211,10 +210,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
adjustMotorCountToThreadParam(activity.getActivityDef());
|
adjustMotorCountToThreadParam(activity.getActivityDef());
|
||||||
}
|
}
|
||||||
motors.stream()
|
motors.stream()
|
||||||
.filter(m -> (m instanceof ActivityDefObserver))
|
.filter(m -> (m instanceof ActivityDefObserver))
|
||||||
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
|
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
|
||||||
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
|
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
|
||||||
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
|
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -228,8 +227,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
|
|
||||||
private String getSlotStatus() {
|
private String getSlotStatus() {
|
||||||
return motors.stream()
|
return motors.stream()
|
||||||
.map(m -> m.getState().get().getCode())
|
.map(m -> m.getState().get().getCode())
|
||||||
.collect(Collectors.joining(",", "[", "]"));
|
.collect(Collectors.joining(",", "[", "]"));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -286,17 +285,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
case Running:
|
case Running:
|
||||||
case Starting:
|
case Starting:
|
||||||
motors.stream()
|
motors.stream()
|
||||||
.filter(m -> m.getState().get() != RunState.Running)
|
.filter(m -> m.getState().get() != RunState.Running)
|
||||||
.filter(m -> m.getState().get() != RunState.Finished)
|
.filter(m -> m.getState().get() != RunState.Finished)
|
||||||
.filter(m -> m.getState().get() != RunState.Starting)
|
.filter(m -> m.getState().get() != RunState.Starting)
|
||||||
.forEach(m -> {
|
.forEach(m -> {
|
||||||
executorService.execute(m);
|
executorService.execute(m);
|
||||||
});
|
});
|
||||||
break;
|
break;
|
||||||
case Stopped:
|
case Stopped:
|
||||||
motors.stream()
|
motors.stream()
|
||||||
.filter(m -> m.getState().get() != RunState.Stopped)
|
.filter(m -> m.getState().get() != RunState.Stopped)
|
||||||
.forEach(Motor::requestStop);
|
.forEach(Motor::requestStop);
|
||||||
break;
|
break;
|
||||||
case Finished:
|
case Finished:
|
||||||
case Stopping:
|
case Stopping:
|
||||||
@@ -309,7 +308,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
|
|
||||||
private void awaitAlignmentOfMotorStateToActivityState() {
|
private void awaitAlignmentOfMotorStateToActivityState() {
|
||||||
|
|
||||||
logger.debug(()->"awaiting state alignment from " + activity.getRunState());
|
logger.debug(() -> "awaiting state alignment from " + activity.getRunState());
|
||||||
switch (activity.getRunState()) {
|
switch (activity.getRunState()) {
|
||||||
case Starting:
|
case Starting:
|
||||||
case Running:
|
case Running:
|
||||||
@@ -412,7 +411,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
private void awaitMotorsAtLeastRunning() {
|
private void awaitMotorsAtLeastRunning() {
|
||||||
RunStateImage states = tally.awaitAny(RunState.Running, RunState.Stopped, RunState.Finished, RunState.Errored);
|
RunStateImage states = tally.awaitAny(RunState.Running, RunState.Stopped, RunState.Finished, RunState.Errored);
|
||||||
RunState maxState = states.getMaxState();
|
RunState maxState = states.getMaxState();
|
||||||
if (maxState==RunState.Errored) {
|
if (maxState == RunState.Errored) {
|
||||||
activity.setRunState(maxState);
|
activity.setRunState(maxState);
|
||||||
throw new RuntimeException("Error in activity");
|
throw new RuntimeException("Error in activity");
|
||||||
}
|
}
|
||||||
@@ -463,17 +462,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
RunStateImage state = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
|
RunStateImage state = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
|
||||||
RunState maxState = state.getMaxState();
|
RunState maxState = state.getMaxState();
|
||||||
activity.setRunState(maxState);
|
activity.setRunState(maxState);
|
||||||
if (maxState==RunState.Errored) {
|
if (maxState == RunState.Errored) {
|
||||||
throw new RuntimeException("Error while waiting for activity completion:" + this.exception);
|
throw new RuntimeException("Error while waiting for activity completion:" + this.exception);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void startMotorExecutorService() {
|
private void startMotorExecutorService() {
|
||||||
this.executorService = new ThreadPoolExecutor(
|
this.executorService = new ThreadPoolExecutor(
|
||||||
0, Integer.MAX_VALUE,
|
0, Integer.MAX_VALUE,
|
||||||
0L, TimeUnit.SECONDS,
|
0L, TimeUnit.SECONDS,
|
||||||
new SynchronousQueue<>(),
|
new SynchronousQueue<>(),
|
||||||
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
|
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -490,14 +489,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
|
|
||||||
logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
|
logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
|
||||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||||
.session(sessionId)
|
.session(sessionId)
|
||||||
.now()
|
.now()
|
||||||
.layer(Layer.Activity)
|
.layer(Layer.Activity)
|
||||||
.label("alias", getActivityDef().getAlias())
|
.label("alias", getActivityDef().getAlias())
|
||||||
.label("driver", getActivityDef().getActivityType())
|
.label("driver", getActivityDef().getActivityType())
|
||||||
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
|
||||||
.detail("params", getActivityDef().toString())
|
.detail("params", getActivityDef().toString())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
|
|
||||||
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");
|
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");
|
||||||
@@ -508,11 +507,11 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
activity.onActivityDefUpdate(activityDef);
|
activity.onActivityDefUpdate(activityDef);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
this.exception = new RuntimeException("Error initializing activity '" + activity.getAlias() + "':\n" + e.getMessage(), e);
|
this.exception = new RuntimeException("Error initializing activity '" + activity.getAlias() + "':\n" + e.getMessage(), e);
|
||||||
activitylogger.error(()->"error initializing activity '" + activity.getAlias() + "': " + exception);
|
activitylogger.error(() -> "error initializing activity '" + activity.getAlias() + "': " + exception);
|
||||||
throw new RuntimeException(exception);
|
throw new RuntimeException(exception);
|
||||||
}
|
}
|
||||||
adjustMotorCountToThreadParam(activity.getActivityDef());
|
adjustMotorCountToThreadParam(activity.getActivityDef());
|
||||||
tally.awaitAny(RunState.Running,RunState.Finished,RunState.Stopped);
|
tally.awaitAny(RunState.Running, RunState.Finished, RunState.Stopped);
|
||||||
activity.setRunState(RunState.Running);
|
activity.setRunState(RunState.Running);
|
||||||
activitylogger.debug("START/after alias=(" + activity.getAlias() + ")");
|
activitylogger.debug("START/after alias=(" + activity.getAlias() + ")");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -35,136 +35,152 @@ import org.apache.logging.log4j.LogManager;
|
|||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
import java.util.Optional;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
import static org.assertj.core.api.Assertions.fail;
|
||||||
|
|
||||||
public class ActivityExecutorTest {
|
class ActivityExecutorTest {
|
||||||
private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class);
|
private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public synchronized void testRestart() {
|
synchronized void testRestart() {
|
||||||
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;op=initdelay:initdelay=5000;");
|
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;op=initdelay:initdelay=5000;");
|
||||||
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
|
new ActivityTypeLoader().load(activityDef);
|
||||||
Activity a = new DelayedInitActivity(ad);
|
|
||||||
InputDispenser idisp = new CoreInputDispenser(a);
|
final Activity activity = new DelayedInitActivity(activityDef);
|
||||||
ActionDispenser adisp = new CoreActionDispenser(a);
|
InputDispenser inputDispenser = new CoreInputDispenser(activity);
|
||||||
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
|
ActionDispenser adisp = new CoreActionDispenser(activity);
|
||||||
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
|
OutputDispenser tdisp = CoreServices.getOutputDispenser(activity).orElse(null);
|
||||||
a.setActionDispenserDelegate(adisp);
|
|
||||||
a.setOutputDispenserDelegate(tdisp);
|
final MotorDispenser<?> mdisp = new CoreMotorDispenser(activity, inputDispenser, adisp, tdisp);
|
||||||
a.setInputDispenserDelegate(idisp);
|
activity.setActionDispenserDelegate(adisp);
|
||||||
a.setMotorDispenserDelegate(mdisp);
|
activity.setOutputDispenserDelegate(tdisp);
|
||||||
ExecutorService executor = Executors.newCachedThreadPool();
|
activity.setInputDispenserDelegate(inputDispenser);
|
||||||
ActivityExecutor ae = new ActivityExecutor(a, "test-restart");
|
activity.setMotorDispenserDelegate(mdisp);
|
||||||
Future<ExecutionResult> future = executor.submit(ae);
|
|
||||||
|
final ExecutorService executor = Executors.newCachedThreadPool();
|
||||||
|
ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-restart");
|
||||||
|
final Future<ExecutionResult> future = executor.submit(activityExecutor);
|
||||||
try {
|
try {
|
||||||
ad.setThreads(1);
|
activityDef.setThreads(1);
|
||||||
ae.startActivity();
|
activityExecutor.startActivity();
|
||||||
ae.stopActivity();
|
activityExecutor.stopActivity();
|
||||||
ae.startActivity();
|
activityExecutor.startActivity();
|
||||||
ae.startActivity();
|
activityExecutor.startActivity();
|
||||||
ExecutionResult executionResult = future.get();
|
future.get();
|
||||||
Thread.sleep(500L);
|
Thread.sleep(500L);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
System.out.print("ad.setThreads(1)");
|
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
assertThat(idisp.getInput(10).getInputSegment(3)).isNull();
|
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public synchronized void testDelayedStartSanity() {
|
synchronized void testDelayedStartSanity() {
|
||||||
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
|
|
||||||
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
|
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
|
||||||
Activity a = new DelayedInitActivity(ad);
|
new ActivityTypeLoader().load(activityDef);
|
||||||
InputDispenser idisp = new CoreInputDispenser(a);
|
|
||||||
ActionDispenser adisp = new CoreActionDispenser(a);
|
final Activity activity = new DelayedInitActivity(activityDef);
|
||||||
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
|
InputDispenser inputDispenser = new CoreInputDispenser(activity);
|
||||||
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
|
ActionDispenser actionDispenser = new CoreActionDispenser(activity);
|
||||||
a.setActionDispenserDelegate(adisp);
|
OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
|
||||||
a.setOutputDispenserDelegate(tdisp);
|
|
||||||
a.setInputDispenserDelegate(idisp);
|
final MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
|
||||||
a.setMotorDispenserDelegate(mdisp);
|
activity.setActionDispenserDelegate(actionDispenser);
|
||||||
|
activity.setOutputDispenserDelegate(outputDispenser);
|
||||||
|
activity.setInputDispenserDelegate(inputDispenser);
|
||||||
|
activity.setMotorDispenserDelegate(motorDispenser);
|
||||||
|
|
||||||
|
final ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
|
||||||
|
|
||||||
|
final ExecutorService testExecutor = Executors.newCachedThreadPool();
|
||||||
|
final Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
|
||||||
|
|
||||||
ActivityExecutor ae = new ActivityExecutor(a, "test-delayed-start");
|
|
||||||
ExecutorService testExecutor = Executors.newCachedThreadPool();
|
|
||||||
Future<ExecutionResult> future = testExecutor.submit(ae);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ad.setThreads(1);
|
activityDef.setThreads(1);
|
||||||
ae.startActivity();
|
activityExecutor.startActivity();
|
||||||
ExecutionResult result = future.get();
|
future.get();
|
||||||
} catch (InterruptedException e) {
|
testExecutor.shutdownNow();
|
||||||
throw new RuntimeException(e);
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
testExecutor.shutdownNow();
|
|
||||||
assertThat(idisp.getInput(10).getInputSegment(3)).isNull();
|
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Unexpected exception", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public synchronized void testNewActivityExecutor() {
|
synchronized void testNewActivityExecutor() {
|
||||||
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;");
|
|
||||||
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
|
|
||||||
Input longSupplier = new AtomicInput(ad);
|
|
||||||
MotorDispenser<?> cmf = getActivityMotorFactory(
|
|
||||||
ad, motorActionDelay(999), longSupplier
|
|
||||||
);
|
|
||||||
Activity a = new SimpleActivity(ad);
|
|
||||||
InputDispenser idisp = new CoreInputDispenser(a);
|
|
||||||
ActionDispenser adisp = new CoreActionDispenser(a);
|
|
||||||
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
|
|
||||||
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
|
|
||||||
a.setActionDispenserDelegate(adisp);
|
|
||||||
a.setInputDispenserDelegate(idisp);
|
|
||||||
a.setMotorDispenserDelegate(mdisp);
|
|
||||||
|
|
||||||
ActivityExecutor ae = new ActivityExecutor(a, "test-new-executor");
|
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
|
||||||
ad.setThreads(5);
|
new ActivityTypeLoader().load(activityDef);
|
||||||
ae.startActivity();
|
|
||||||
|
getActivityMotorFactory(motorActionDelay(999), new AtomicInput(activityDef));
|
||||||
|
|
||||||
|
final Activity simpleActivity = new SimpleActivity(activityDef);
|
||||||
|
InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
|
||||||
|
ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
|
||||||
|
OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null);
|
||||||
|
|
||||||
|
final MotorDispenser<?> motorDispenser = new CoreMotorDispenser<>(simpleActivity,
|
||||||
|
inputDispenser, actionDispenser, outputDispenser);
|
||||||
|
|
||||||
|
simpleActivity.setActionDispenserDelegate(actionDispenser);
|
||||||
|
simpleActivity.setInputDispenserDelegate(inputDispenser);
|
||||||
|
simpleActivity.setMotorDispenserDelegate(motorDispenser);
|
||||||
|
|
||||||
|
final ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor");
|
||||||
|
activityDef.setThreads(5);
|
||||||
|
activityExecutor.startActivity();
|
||||||
|
|
||||||
int[] speeds = new int[]{1, 2000, 5, 2000, 2, 2000};
|
int[] speeds = new int[]{1, 2000, 5, 2000, 2, 2000};
|
||||||
for (int offset = 0; offset < speeds.length; offset += 2) {
|
for (int offset = 0; offset < speeds.length; offset += 2) {
|
||||||
int threadTarget = speeds[offset];
|
int threadTarget = speeds[offset];
|
||||||
int threadTime = speeds[offset + 1];
|
int threadTime = speeds[offset + 1];
|
||||||
logger.info(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
|
|
||||||
ad.setThreads(threadTarget);
|
logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
|
||||||
|
activityDef.setThreads(threadTarget);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
Thread.sleep(threadTime);
|
Thread.sleep(threadTime);
|
||||||
} catch (InterruptedException ignored) {
|
} catch (Exception e) {
|
||||||
|
fail("Not expecting exception", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ad.setThreads(0);
|
|
||||||
|
|
||||||
|
// Used for slowing the roll due to state transitions in test.
|
||||||
|
try {
|
||||||
|
activityExecutor.stopActivity();
|
||||||
|
Thread.sleep(2000L);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Not expecting exception", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private MotorDispenser<?> getActivityMotorFactory(final ActivityDef ad, Action lc, final Input ls) {
|
private MotorDispenser<?> getActivityMotorFactory(Action lc, final Input ls) {
|
||||||
MotorDispenser<?> cmf = new MotorDispenser<>() {
|
return new MotorDispenser<>() {
|
||||||
@Override
|
@Override
|
||||||
public Motor getMotor(ActivityDef activityDef, int slotId) {
|
public Motor getMotor(ActivityDef activityDef, int slotId) {
|
||||||
Activity activity = new SimpleActivity(activityDef);
|
Activity activity = new SimpleActivity(activityDef);
|
||||||
Motor<?> cm = new CoreMotor(activity, slotId, ls);
|
Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
|
||||||
cm.setAction(lc);
|
cm.setAction(lc);
|
||||||
return cm;
|
return cm;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return cmf;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private SyncAction motorActionDelay(final long delay) {
|
private SyncAction motorActionDelay(final long delay) {
|
||||||
SyncAction consumer = new SyncAction() {
|
return new SyncAction() {
|
||||||
@Override
|
@Override
|
||||||
public int runCycle(long cycle) {
|
public int runCycle(long cycle) {
|
||||||
System.out.println("consuming " + cycle + ", delaying:" + delay);
|
logger.info(() -> "consuming " + cycle + ", delaying:" + delay);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(delay);
|
Thread.sleep(delay);
|
||||||
} catch (InterruptedException ignored) {
|
} catch (InterruptedException ignored) {
|
||||||
@@ -172,7 +188,7 @@ public class ActivityExecutorTest {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
return consumer;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class DelayedInitActivity extends SimpleActivity {
|
private static class DelayedInitActivity extends SimpleActivity {
|
||||||
@@ -184,13 +200,13 @@ public class ActivityExecutorTest {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void initActivity() {
|
public void initActivity() {
|
||||||
Integer initdelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
|
Integer initDelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
|
||||||
logger.info(() -> "delaying for " + initdelay);
|
logger.info(() -> "delaying for " + initDelay);
|
||||||
try {
|
try {
|
||||||
Thread.sleep(initdelay);
|
Thread.sleep(initDelay);
|
||||||
} catch (InterruptedException ignored) {
|
} catch (InterruptedException ignored) {
|
||||||
}
|
}
|
||||||
logger.info(() -> "delayed for " + initdelay);
|
logger.info(() -> "delayed for " + initDelay);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,7 +75,7 @@ public class ActivityDef implements NBNamedElement {
|
|||||||
ActivityDef activityDef = new ActivityDef(activityParameterMap.orElseThrow(
|
ActivityDef activityDef = new ActivityDef(activityParameterMap.orElseThrow(
|
||||||
() -> new RuntimeException("Unable to parse:" + namedActivitySpec)
|
() -> new RuntimeException("Unable to parse:" + namedActivitySpec)
|
||||||
));
|
));
|
||||||
logger.debug("parsed activityDef " + namedActivitySpec + " to-> " + activityDef);
|
logger.info("parsed activityDef " + namedActivitySpec + " to-> " + activityDef);
|
||||||
|
|
||||||
return activityDef;
|
return activityDef;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user