shutdown delay is much lower

This commit is contained in:
Jonathan Shook 2020-07-17 12:26:37 -05:00
parent 8af5706544
commit e0b682a40c
2 changed files with 70 additions and 53 deletions

View File

@ -1,17 +1,17 @@
/*
* Copyright 2015 jshook
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
* Copyright 2015 jshook
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core;
import io.nosqlbench.engine.api.activityapi.core.*;
@ -35,12 +35,11 @@ import java.util.stream.Collectors;
/**
* <p>An ActivityExecutor is a named instance of an execution harness for a single activity instance.
* It is responsible for managing threads and activity settings which may be changed while the
* activity is running.</p>
* It is responsible for managing threads and activity settings which may be changed while the activity is running.</p>
*
* <p>An ActivityExecutor may be represent an activity that is defined and active in the running
* scenario, but which is inactive. This can occur when an activity is paused by controlling logic,
* or when the threads are set to zero.</p>
* scenario, but which is inactive. This can occur when an activity is paused by controlling logic, or when the threads
* are set to zero.</p>
*
* <p>
* Invariants:
@ -61,7 +60,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private ExecutorService executorService;
private RuntimeException stoppingException;
private final static int waitTime=10000;
private final static int waitTime = 10000;
// private RunState intendedState = RunState.Uninitialized;
@ -84,9 +83,9 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
/**
* <p>True-up the number of motor instances known to the executor. Start all non-running motors.
* The protocol between the motors and the executor should be safe as long as each state change
* is owned by either the motor logic or the activity executor but not both, and strictly serialized
* as well. This is enforced by forcing start(...) to be serialized as well as using CAS on the motor states.</p>
* The protocol between the motors and the executor should be safe as long as each state change is owned by either
* the motor logic or the activity executor but not both, and strictly serialized as well. This is enforced by
* forcing start(...) to be serialized as well as using CAS on the motor states.</p>
* <p>The startActivity method may be called to true-up the number of active motors in an activity executor after
* changes to threads.</p>
*/
@ -98,7 +97,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
activity.initActivity();
//activity.onActivityDefUpdate(activityDef);
} catch (Exception e) {
this.stoppingException = new RuntimeException("Error initializing activity '" + activity.getAlias() +"':\n" + e.getMessage(),e);
this.stoppingException = new RuntimeException("Error initializing activity '" + activity.getAlias() + "':\n" + e.getMessage(), e);
// activitylogger.error("error initializing activity '" + activity.getAlias() + "': " + stoppingException);
throw stoppingException;
}
@ -128,9 +127,11 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
/**
* Shutdown the activity executor, with a grace period for the motor threads.
*
* @param initialMillisToWait milliseconds to wait after graceful shutdownActivity request, before forcing everything to stop
* @param initialMillisToWait milliseconds to wait after graceful shutdownActivity request, before forcing
* everything to stop
*/
public synchronized void forceStopExecutor(int initialMillisToWait) {
activitylogger.debug("FORCE STOP/before alias=(" + activity.getAlias() + ")");
activity.setRunState(RunState.Stopped);
@ -138,22 +139,37 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
executorService.shutdown();
requestStopMotors();
try {
Thread.sleep(initialMillisToWait);
} catch (InterruptedException ignored) {
int divisor = 100;
int polltime = initialMillisToWait / divisor;
long gracefulWaitStartedAt = System.currentTimeMillis();
long waitUntil = initialMillisToWait + gracefulWaitStartedAt;
long time = gracefulWaitStartedAt;
while (time < waitUntil && !executorService.isTerminated()) {
try {
Thread.sleep(polltime);
} catch (InterruptedException ignored) {
}
}
long gracefulWaitEndedAt = System.currentTimeMillis();
logger.debug("took " + (gracefulWaitEndedAt - gracefulWaitStartedAt) + " ms to shutdown gracefully");
if (!executorService.isTerminated()) {
logger.info("stopping activity forcibly " + activity.getAlias());
List<Runnable> runnables = executorService.shutdownNow();
long forcibleShutdownCompletedAt = System.currentTimeMillis();
logger.debug("took " + (forcibleShutdownCompletedAt - gracefulWaitEndedAt) + " ms to shutdown forcibly");
logger.debug(runnables.size() + " tasks never started.");
}
logger.info("stopping activity forcibly " + activity.getAlias());
List<Runnable> runnables = executorService.shutdownNow();
long activityShutdownStartedAt = System.currentTimeMillis();
logger.debug("invoking activity-specific shutdown hooks");
activity.shutdownActivity();
activity.closeAutoCloseables();
long activityShutdownEndedAt = System.currentTimeMillis();
logger.debug("took " + (activityShutdownEndedAt - activityShutdownStartedAt) + " ms to shutdown activity threads");
logger.debug(runnables.size() + " threads never started.");
if (stoppingException!=null) {
if (stoppingException != null) {
activitylogger.debug("FORCE STOP/exception alias=(" + activity.getAlias() + ")");
throw stoppingException;
}
activitylogger.debug("FORCE STOP/after alias=(" + activity.getAlias() + ")");
@ -170,45 +186,45 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
executorService.shutdown();
boolean wasStopped = false;
try {
logger.trace("awaiting termination with timeout of " + secondsToWait +" seconds");
logger.trace("awaiting termination with timeout of " + secondsToWait + " seconds");
wasStopped = executorService.awaitTermination(secondsToWait, TimeUnit.SECONDS);
} catch (InterruptedException ie) {
logger.trace("interrupted while awaiting termination");
wasStopped = false;
logger.warn("while waiting termination of activity " + activity.getAlias() + ", " + ie.getMessage());
activitylogger.debug("REQUEST STOP/exception alias=(" + activity.getAlias() + ") wasstopped=" +wasStopped);
activitylogger.debug("REQUEST STOP/exception alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped);
} finally {
logger.trace("finally shutting down activity " + this.getActivity().getAlias());
activity.shutdownActivity();
logger.trace("closing auto-closeables");
activity.closeAutoCloseables();
}
if (stoppingException!=null) {
if (stoppingException != null) {
logger.trace("an exception caused the activity to stop:" + stoppingException.getMessage());
throw stoppingException;
}
activitylogger.debug("REQUEST STOP/after alias=(" + activity.getAlias() + ") wasstopped=" +wasStopped);
activitylogger.debug("REQUEST STOP/after alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped);
return wasStopped;
}
/**
* Listens for changes to parameter maps, maps them to the activity instance, and notifies
* all eligible listeners of changes.
* Listens for changes to parameter maps, maps them to the activity instance, and notifies all eligible listeners of
* changes.
*/
@Override
public synchronized void handleParameterMapUpdate(ParameterMap parameterMap) {
if (activity instanceof ActivityDefObserver) {
((ActivityDefObserver)activity).onActivityDefUpdate(activityDef);
((ActivityDefObserver) activity).onActivityDefUpdate(activityDef);
}
// An activity must be initialized before the motors and other components are
// considered ready to handle parameter map changes. This is signaled in an activity
// by the RunState.
if (activity.getRunState()!=RunState.Uninitialized) {
if (activity.getRunState()==RunState.Running) {
if (activity.getRunState() != RunState.Uninitialized) {
if (activity.getRunState() == RunState.Running) {
adjustToActivityDef(activity.getActivityDef());
}
motors.stream()
@ -234,7 +250,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
if (awaited) {
awaited = awaitCompletion(timeout);
}
if (stoppingException!=null) {
if (stoppingException != null) {
activitylogger.debug("AWAIT-FINISH/exception alias=(" + activity.getAlias() + ")");
throw stoppingException;
}
@ -338,9 +354,9 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
/**
* Await a thread (aka motor/slot) entering a specific SlotState
*
* @param m motor instance
* @param waitTime milliseconds to wait, total
* @param pollTime polling interval between state checks
* @param m motor instance
* @param waitTime milliseconds to wait, total
* @param pollTime polling interval between state checks
* @param runState any desired SlotState
* @return true, if the desired SlotState was detected
*/
@ -367,12 +383,12 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
long startedAt = System.currentTimeMillis();
boolean awaited = false;
while (!awaited && (System.currentTimeMillis() < (startedAt + waitTime))) {
awaited=true;
awaited = true;
for (Motor motor : motors) {
awaited = awaitMotorState(motor, waitTime, pollTime, awaitingState);
if (!awaited) {
logger.trace("failed awaiting motor " + motor.getSlotId() + " for state in " +
Arrays.asList(awaitingState));
Arrays.asList(awaitingState));
break;
}
}
@ -417,7 +433,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
if (!awaitedRequiredState) {
String error = "Unable to await " + activityDef.getAlias() +
"/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState()
+ " after waiting for " + waitTime +"ms";
+ " after waiting for " + waitTime + "ms";
RuntimeException e = new RuntimeException(error);
logger.error(error);
throw e;
@ -473,7 +489,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
public String getProgressDetails() {
return motors.stream().map(Motor::getInput).distinct().findFirst()
.filter(i -> i instanceof ProgressCapable)
.map(i -> ((ProgressCapable)i).getProgressDetails()).orElse("");
.map(i -> ((ProgressCapable) i).getProgressDetails()).orElse("");
}
@ -493,21 +509,21 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
public synchronized void notifyException(Thread t, Throwable e) {
//logger.error("Uncaught exception in activity thread forwarded to activity executor:", e);
this.stoppingException=new RuntimeException("Error in activity thread " +t.getName(), e);
this.stoppingException = new RuntimeException("Error in activity thread " + t.getName(), e);
forceStopExecutor(10000);
}
@Override
public synchronized void stopActivityWithReasonAsync(String reason) {
logger.info("Stopping activity " + this.activityDef.getAlias() + ": " + reason);
this.stoppingException=new RuntimeException("Stopping activity " + this.activityDef.getAlias() + ": " + reason);
this.stoppingException = new RuntimeException("Stopping activity " + this.activityDef.getAlias() + ": " + reason);
logger.error("stopping with reason: " + stoppingException);
requestStopMotors();
}
@Override
public synchronized void stopActivityWithErrorAsync(Throwable throwable) {
if (stoppingException==null) {
if (stoppingException == null) {
this.stoppingException = new RuntimeException(throwable);
logger.error("stopping on error: " + throwable.toString(), throwable);
} else {

View File

@ -217,6 +217,7 @@ public class Scenario implements Callable<ScenarioResult> {
System.err.flush();
System.out.flush();
} catch (Exception e) {
this.scenarioController.forceStopScenario(5000);
throw new RuntimeException(e);
} finally {
System.out.flush();