mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
move threadpool init within Callable flow
This commit is contained in:
parent
edd3de63de
commit
9fc0530c17
@ -79,6 +79,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
// TODO: Doc how uninitialized activities do not propagate parameter map changes and how
|
// TODO: Doc how uninitialized activities do not propagate parameter map changes and how
|
||||||
// TODO: this is different from preventing modification to uninitialized activities
|
// TODO: this is different from preventing modification to uninitialized activities
|
||||||
|
|
||||||
|
// TODO: Determine whether this should really be synchronized
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simply stop the motors
|
* Simply stop the motors
|
||||||
*/
|
*/
|
||||||
@ -183,7 +185,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
/**
|
/**
|
||||||
* Shutdown the activity executor, with a grace period for the motor threads.
|
* Shutdown the activity executor, with a grace period for the motor threads.
|
||||||
*
|
*
|
||||||
* @param initialMillisToWait milliseconds to wait after graceful shutdownActivity request, before forcing
|
* @param initialMillisToWait
|
||||||
|
* milliseconds to wait after graceful shutdownActivity request, before forcing
|
||||||
* everything to stop
|
* everything to stop
|
||||||
*/
|
*/
|
||||||
public synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) {
|
public synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) {
|
||||||
@ -234,7 +237,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
/**
|
/**
|
||||||
* Stop extra motors, start missing motors
|
* Stop extra motors, start missing motors
|
||||||
*
|
*
|
||||||
* @param activityDef the activityDef for this activity instance
|
* @param activityDef
|
||||||
|
* the activityDef for this activity instance
|
||||||
*/
|
*/
|
||||||
private void adjustMotorCountToThreadParam(ActivityDef activityDef) {
|
private void adjustMotorCountToThreadParam(ActivityDef activityDef) {
|
||||||
logger.trace(() -> ">-pre-adjust->" + getSlotStatus());
|
logger.trace(() -> ">-pre-adjust->" + getSlotStatus());
|
||||||
@ -311,26 +315,28 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
private void awaitAlignmentOfMotorStateToActivityState() {
|
private void awaitAlignmentOfMotorStateToActivityState() {
|
||||||
|
|
||||||
logger.debug(() -> "awaiting state alignment from " + activity.getRunState());
|
logger.debug(() -> "awaiting state alignment from " + activity.getRunState());
|
||||||
|
RunStateImage states = null;
|
||||||
switch (activity.getRunState()) {
|
switch (activity.getRunState()) {
|
||||||
case Starting:
|
case Starting:
|
||||||
case Running:
|
case Running:
|
||||||
tally.awaitNoneOther(RunState.Running, RunState.Finished);
|
states = tally.awaitNoneOther(RunState.Running, RunState.Finished);
|
||||||
break;
|
break;
|
||||||
case Errored:
|
case Errored:
|
||||||
case Stopping:
|
case Stopping:
|
||||||
case Stopped:
|
case Stopped:
|
||||||
tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
|
states = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
|
||||||
break;
|
break;
|
||||||
case Uninitialized:
|
case Uninitialized:
|
||||||
break;
|
break;
|
||||||
case Finished:
|
case Finished:
|
||||||
tally.awaitNoneOther(RunState.Finished);
|
states = tally.awaitNoneOther(RunState.Finished);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new RuntimeException("Unmatched run state:" + activity.getRunState());
|
throw new RuntimeException("Unmatched run state:" + activity.getRunState());
|
||||||
}
|
}
|
||||||
logger.debug("activity and threads are aligned to state " + activity.getRunState() + " for " + this.getActivity().getAlias());
|
RunState previousState = activity.getRunState();
|
||||||
|
activity.setRunState(states.getMaxState());
|
||||||
|
logger.debug("activity and threads are aligned to state " + previousState + " for " + this.getActivity().getAlias() + ", and advanced to " + activity.getRunState());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -391,12 +397,16 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
|||||||
// instantiate and configure fixtures that need to be present
|
// instantiate and configure fixtures that need to be present
|
||||||
// before threads start running such as metrics instruments
|
// before threads start running such as metrics instruments
|
||||||
activity.initActivity();
|
activity.initActivity();
|
||||||
|
startMotorExecutorService();
|
||||||
|
startRunningActivityThreads();
|
||||||
awaitMotorsAtLeastRunning();
|
awaitMotorsAtLeastRunning();
|
||||||
|
logger.debug("STARTED " + activityDef.getAlias());
|
||||||
awaitActivityCompletion();
|
awaitActivityCompletion();
|
||||||
activity.shutdownActivity();
|
|
||||||
activity.closeAutoCloseables();
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
this.exception = e;
|
this.exception = e;
|
||||||
|
} finally {
|
||||||
|
activity.shutdownActivity();
|
||||||
|
activity.closeAutoCloseables();
|
||||||
}
|
}
|
||||||
ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception);
|
ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception);
|
||||||
return result;
|
return result;
|
||||||
|
@ -86,8 +86,6 @@ public class ScenarioController {
|
|||||||
Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor);
|
Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor);
|
||||||
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);
|
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);
|
||||||
this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo);
|
this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo);
|
||||||
executor.startActivity();
|
|
||||||
scenariologger.debug("STARTED " + activityDef.getAlias());
|
|
||||||
}
|
}
|
||||||
return this.activityInfoMap.get(activityDef.getAlias());
|
return this.activityInfoMap.get(activityDef.getAlias());
|
||||||
}
|
}
|
||||||
|
@ -69,21 +69,20 @@ class ActivityExecutorTest {
|
|||||||
activityExecutor.startActivity();
|
activityExecutor.startActivity();
|
||||||
activityExecutor.stopActivity();
|
activityExecutor.stopActivity();
|
||||||
activityExecutor.startActivity();
|
activityExecutor.startActivity();
|
||||||
activityExecutor.startActivity();
|
activityExecutor.stopActivity();
|
||||||
future.get();
|
future.get();
|
||||||
Thread.sleep(500L);
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
|
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNotNull();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
synchronized void testDelayedStartSanity() {
|
synchronized void testDelayedStartSanity() {
|
||||||
|
|
||||||
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
|
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=2000;");
|
||||||
new ActivityTypeLoader().load(activityDef);
|
new ActivityTypeLoader().load(activityDef);
|
||||||
|
|
||||||
final Activity activity = new DelayedInitActivity(activityDef);
|
final Activity activity = new DelayedInitActivity(activityDef);
|
||||||
|
Loading…
Reference in New Issue
Block a user