diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/ActivityExecutor.java b/engine-core/src/main/java/io/nosqlbench/engine/core/ActivityExecutor.java index 7a50b7743..ad52de2fd 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/ActivityExecutor.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/ActivityExecutor.java @@ -1,17 +1,17 @@ /* -* Copyright 2015 jshook -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Copyright 2015 jshook + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.nosqlbench.engine.core; import io.nosqlbench.engine.api.activityapi.core.*; @@ -35,12 +35,11 @@ import java.util.stream.Collectors; /** *

An ActivityExecutor is a named instance of an execution harness for a single activity instance. - * It is responsible for managing threads and activity settings which may be changed while the - * activity is running.

+ * It is responsible for managing threads and activity settings which may be changed while the activity is running.

* *

An ActivityExecutor may be represent an activity that is defined and active in the running - * scenario, but which is inactive. This can occur when an activity is paused by controlling logic, - * or when the threads are set to zero.

+ * scenario, but which is inactive. This can occur when an activity is paused by controlling logic, or when the threads + * are set to zero.

* *

* Invariants: @@ -61,7 +60,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen private ExecutorService executorService; private RuntimeException stoppingException; - private final static int waitTime=10000; + private final static int waitTime = 10000; // private RunState intendedState = RunState.Uninitialized; @@ -84,9 +83,9 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen /** *

True-up the number of motor instances known to the executor. Start all non-running motors. - * The protocol between the motors and the executor should be safe as long as each state change - * is owned by either the motor logic or the activity executor but not both, and strictly serialized - * as well. This is enforced by forcing start(...) to be serialized as well as using CAS on the motor states.

+ * The protocol between the motors and the executor should be safe as long as each state change is owned by either + * the motor logic or the activity executor but not both, and strictly serialized as well. This is enforced by + * forcing start(...) to be serialized as well as using CAS on the motor states.

*

The startActivity method may be called to true-up the number of active motors in an activity executor after * changes to threads.

*/ @@ -98,7 +97,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen activity.initActivity(); //activity.onActivityDefUpdate(activityDef); } catch (Exception e) { - this.stoppingException = new RuntimeException("Error initializing activity '" + activity.getAlias() +"':\n" + e.getMessage(),e); + this.stoppingException = new RuntimeException("Error initializing activity '" + activity.getAlias() + "':\n" + e.getMessage(), e); // activitylogger.error("error initializing activity '" + activity.getAlias() + "': " + stoppingException); throw stoppingException; } @@ -128,9 +127,11 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen /** * Shutdown the activity executor, with a grace period for the motor threads. * - * @param initialMillisToWait milliseconds to wait after graceful shutdownActivity request, before forcing everything to stop + * @param initialMillisToWait milliseconds to wait after graceful shutdownActivity request, before forcing + * everything to stop */ public synchronized void forceStopExecutor(int initialMillisToWait) { + activitylogger.debug("FORCE STOP/before alias=(" + activity.getAlias() + ")"); activity.setRunState(RunState.Stopped); @@ -138,22 +139,37 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen executorService.shutdown(); requestStopMotors(); - try { - Thread.sleep(initialMillisToWait); - } catch (InterruptedException ignored) { + int divisor = 100; + int polltime = initialMillisToWait / divisor; + long gracefulWaitStartedAt = System.currentTimeMillis(); + long waitUntil = initialMillisToWait + gracefulWaitStartedAt; + long time = gracefulWaitStartedAt; + while (time < waitUntil && !executorService.isTerminated()) { + try { + Thread.sleep(polltime); + } catch (InterruptedException ignored) { + } + } + long gracefulWaitEndedAt = System.currentTimeMillis(); + logger.debug("took " + (gracefulWaitEndedAt - gracefulWaitStartedAt) + " ms to shutdown gracefully"); + + if (!executorService.isTerminated()) { + logger.info("stopping activity forcibly " + activity.getAlias()); + List runnables = executorService.shutdownNow(); + long forcibleShutdownCompletedAt = System.currentTimeMillis(); + logger.debug("took " + (forcibleShutdownCompletedAt - gracefulWaitEndedAt) + " ms to shutdown forcibly"); + logger.debug(runnables.size() + " tasks never started."); } - logger.info("stopping activity forcibly " + activity.getAlias()); - List runnables = executorService.shutdownNow(); - + long activityShutdownStartedAt = System.currentTimeMillis(); + logger.debug("invoking activity-specific shutdown hooks"); activity.shutdownActivity(); activity.closeAutoCloseables(); + long activityShutdownEndedAt = System.currentTimeMillis(); + logger.debug("took " + (activityShutdownEndedAt - activityShutdownStartedAt) + " ms to shutdown activity threads"); - logger.debug(runnables.size() + " threads never started."); - - if (stoppingException!=null) { + if (stoppingException != null) { activitylogger.debug("FORCE STOP/exception alias=(" + activity.getAlias() + ")"); - throw stoppingException; } activitylogger.debug("FORCE STOP/after alias=(" + activity.getAlias() + ")"); @@ -170,45 +186,45 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen executorService.shutdown(); boolean wasStopped = false; try { - logger.trace("awaiting termination with timeout of " + secondsToWait +" seconds"); + logger.trace("awaiting termination with timeout of " + secondsToWait + " seconds"); wasStopped = executorService.awaitTermination(secondsToWait, TimeUnit.SECONDS); } catch (InterruptedException ie) { logger.trace("interrupted while awaiting termination"); wasStopped = false; logger.warn("while waiting termination of activity " + activity.getAlias() + ", " + ie.getMessage()); - activitylogger.debug("REQUEST STOP/exception alias=(" + activity.getAlias() + ") wasstopped=" +wasStopped); + activitylogger.debug("REQUEST STOP/exception alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped); } finally { logger.trace("finally shutting down activity " + this.getActivity().getAlias()); activity.shutdownActivity(); logger.trace("closing auto-closeables"); activity.closeAutoCloseables(); } - if (stoppingException!=null) { + if (stoppingException != null) { logger.trace("an exception caused the activity to stop:" + stoppingException.getMessage()); throw stoppingException; } - activitylogger.debug("REQUEST STOP/after alias=(" + activity.getAlias() + ") wasstopped=" +wasStopped); + activitylogger.debug("REQUEST STOP/after alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped); return wasStopped; } /** - * Listens for changes to parameter maps, maps them to the activity instance, and notifies - * all eligible listeners of changes. + * Listens for changes to parameter maps, maps them to the activity instance, and notifies all eligible listeners of + * changes. */ @Override public synchronized void handleParameterMapUpdate(ParameterMap parameterMap) { if (activity instanceof ActivityDefObserver) { - ((ActivityDefObserver)activity).onActivityDefUpdate(activityDef); + ((ActivityDefObserver) activity).onActivityDefUpdate(activityDef); } // An activity must be initialized before the motors and other components are // considered ready to handle parameter map changes. This is signaled in an activity // by the RunState. - if (activity.getRunState()!=RunState.Uninitialized) { - if (activity.getRunState()==RunState.Running) { + if (activity.getRunState() != RunState.Uninitialized) { + if (activity.getRunState() == RunState.Running) { adjustToActivityDef(activity.getActivityDef()); } motors.stream() @@ -234,7 +250,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen if (awaited) { awaited = awaitCompletion(timeout); } - if (stoppingException!=null) { + if (stoppingException != null) { activitylogger.debug("AWAIT-FINISH/exception alias=(" + activity.getAlias() + ")"); throw stoppingException; } @@ -338,9 +354,9 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen /** * Await a thread (aka motor/slot) entering a specific SlotState * - * @param m motor instance - * @param waitTime milliseconds to wait, total - * @param pollTime polling interval between state checks + * @param m motor instance + * @param waitTime milliseconds to wait, total + * @param pollTime polling interval between state checks * @param runState any desired SlotState * @return true, if the desired SlotState was detected */ @@ -367,12 +383,12 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen long startedAt = System.currentTimeMillis(); boolean awaited = false; while (!awaited && (System.currentTimeMillis() < (startedAt + waitTime))) { - awaited=true; + awaited = true; for (Motor motor : motors) { awaited = awaitMotorState(motor, waitTime, pollTime, awaitingState); if (!awaited) { logger.trace("failed awaiting motor " + motor.getSlotId() + " for state in " + - Arrays.asList(awaitingState)); + Arrays.asList(awaitingState)); break; } } @@ -417,7 +433,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen if (!awaitedRequiredState) { String error = "Unable to await " + activityDef.getAlias() + "/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState() - + " after waiting for " + waitTime +"ms"; + + " after waiting for " + waitTime + "ms"; RuntimeException e = new RuntimeException(error); logger.error(error); throw e; @@ -473,7 +489,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen public String getProgressDetails() { return motors.stream().map(Motor::getInput).distinct().findFirst() .filter(i -> i instanceof ProgressCapable) - .map(i -> ((ProgressCapable)i).getProgressDetails()).orElse(""); + .map(i -> ((ProgressCapable) i).getProgressDetails()).orElse(""); } @@ -493,21 +509,21 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen public synchronized void notifyException(Thread t, Throwable e) { //logger.error("Uncaught exception in activity thread forwarded to activity executor:", e); - this.stoppingException=new RuntimeException("Error in activity thread " +t.getName(), e); + this.stoppingException = new RuntimeException("Error in activity thread " + t.getName(), e); forceStopExecutor(10000); } @Override public synchronized void stopActivityWithReasonAsync(String reason) { logger.info("Stopping activity " + this.activityDef.getAlias() + ": " + reason); - this.stoppingException=new RuntimeException("Stopping activity " + this.activityDef.getAlias() + ": " + reason); + this.stoppingException = new RuntimeException("Stopping activity " + this.activityDef.getAlias() + ": " + reason); logger.error("stopping with reason: " + stoppingException); requestStopMotors(); } @Override public synchronized void stopActivityWithErrorAsync(Throwable throwable) { - if (stoppingException==null) { + if (stoppingException == null) { this.stoppingException = new RuntimeException(throwable); logger.error("stopping on error: " + throwable.toString(), throwable); } else { diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/script/Scenario.java b/engine-core/src/main/java/io/nosqlbench/engine/core/script/Scenario.java index f7f9a45bb..d02a14d1c 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/script/Scenario.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/script/Scenario.java @@ -217,6 +217,7 @@ public class Scenario implements Callable { System.err.flush(); System.out.flush(); } catch (Exception e) { + this.scenarioController.forceStopScenario(5000); throw new RuntimeException(e); } finally { System.out.flush();