From 86c4dfd9669e168d7203339fbc59682bd15de7aa Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Wed, 30 Nov 2022 11:17:10 -0600 Subject: [PATCH] partial fixes for nosqlbench-797 Race condition between exceptional activity shutdown and normal scenario shutdown. --- ...sync_activity-Lifecycle_of_an_activity.svg | 236 ++++++++++++++++++ devdocs/concurrency/async_activity.puml | 105 ++++++++ ...-Lifecycle_of_a_single_scenario_call__.svg | 160 ++++++++++++ devdocs/concurrency/async_scenario.puml | 67 +++++ ...async_scenarios-Lifecycle_of_Scenarios.svg | 150 +++++++++++ devdocs/concurrency/async_scenarios.puml | 62 +++++ .../engine/api/activityapi/core/RunState.java | 98 ++++---- .../java/io/nosqlbench/engine/cli/NBCLI.java | 6 +- .../lifecycle/ActivityExceptionHandler.java | 4 +- .../core/lifecycle/ActivityFinisher.java | 4 +- .../engine/core/lifecycle/ActivityStatus.java | 20 ++ ...cutor.java => ActivityThreadsManager.java} | 180 ++++++------- ...arioResult.java => ExecMetricsResult.java} | 67 +---- .../engine/core/lifecycle/ExecResult.java | 68 +++++ .../core/lifecycle/ScenarioController.java | 67 ++--- .../core/lifecycle/ScenariosResults.java | 21 +- .../core/lifecycle/StartedActivityInfo.java | 29 +++ .../engine/core/script/Scenario.java | 61 +++-- .../engine/core/script/ScenariosExecutor.java | 24 +- ...t.java => ActivityThreadsManagerTest.java} | 12 +- .../resources/ScenarioExecutorEndpoint.java | 6 +- .../engine/rest/transfertypes/ResultView.java | 6 +- .../nbr/examples/ScriptExampleTests.java | 50 ++-- .../examples/SpeedCheckIntegrationTests.java | 6 +- .../testing/ExitStatusIntegrationTests.java | 28 +-- 25 files changed, 1184 insertions(+), 353 deletions(-) create mode 100644 devdocs/concurrency/async_activity-Lifecycle_of_an_activity.svg create mode 100644 devdocs/concurrency/async_activity.puml create mode 100644 devdocs/concurrency/async_scenario-Lifecycle_of_a_single_scenario_call__.svg create mode 100644 devdocs/concurrency/async_scenario.puml create mode 100644 devdocs/concurrency/async_scenarios-Lifecycle_of_Scenarios.svg create mode 100644 devdocs/concurrency/async_scenarios.puml create mode 100644 engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityStatus.java rename engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/{ActivityExecutor.java => ActivityThreadsManager.java} (77%) rename engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/{ScenarioResult.java => ExecMetricsResult.java} (68%) create mode 100644 engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ExecResult.java create mode 100644 engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/StartedActivityInfo.java rename engine-core/src/test/java/io/nosqlbench/engine/core/{ActivityExecutorTest.java => ActivityThreadsManagerTest.java} (94%) diff --git a/devdocs/concurrency/async_activity-Lifecycle_of_an_activity.svg b/devdocs/concurrency/async_activity-Lifecycle_of_an_activity.svg new file mode 100644 index 000000000..e159d5c6d --- /dev/null +++ b/devdocs/concurrency/async_activity-Lifecycle_of_an_activity.svg @@ -0,0 +1,236 @@ + + +Lifecycle of an activitycallercallerActivityExecutorActivityExceptionHandlerActivityThreadFactoryExecutorServiceAnnotatorAnnotatorActivityActivitythreadstartup sequencecreateActivityExecutorcreateActivityExceptionHandlercreate(\w Exception Handler)ActivityThreadFactory<injectedvia ctor>create(\w Thread Factory)ExecutorService<injectedvia ctor>startActivity()Annotate Activity StartinitActivity()align threadcount as explained belowdynamic threadcount updatethreads can be changed dynamicallyapply paramsalign motor countstop extra motors<start missing motors>for each new thread/motorexecute(<motor>)get()createthread<thread>run()At this point, themotor thread starts runningthe defined activity's actionover cyclesawait thread state updateshutdown sequence [after startup]stopActivity()request stop motorsawait all stopshutdownActivity()Annotate Activity Finishon exception in motor threadcatch(<thrown exception>)notifyException(<thread>,<throwable>)save exceptionforceStopActivity()shutdown();if needed[after timeout]]shutdownNow();shutdownActivity();closeAutoCloseables();actionthreadterminates diff --git a/devdocs/concurrency/async_activity.puml b/devdocs/concurrency/async_activity.puml new file mode 100644 index 000000000..cb276a443 --- /dev/null +++ b/devdocs/concurrency/async_activity.puml @@ -0,0 +1,105 @@ +@startuml +'https://plantuml.com/sequence-diagram +title: Lifecycle of an activity + +control caller as caller +control ActivityExecutor as ae +control "Activity\nException\nHandler" as aeh +control "Activity\nThread\nFactory" as atf +control ExecutorService as aes +control Annotator as ann +control Activity as activity + +== startup sequence == +caller -> ae**: create + ae -> aeh**: create + ae -> atf**: create(\w Exception Handler) + aeh -> atf: + ae -> aes**: create(\w Thread Factory) + atf -> aes: + +caller -> ae: startActivity() +activate ae + ae -> ann: Annotate Activity Start + + ae -> activity: initActivity() + activate activity + ae <- activity + deactivate activity + + note over ae,aes: align threadcount as explained below + +caller <- ae +deactivate ae + +== dynamic threadcount update == +note over ae, aes: threads can be changed dynamically + +caller -> ae: apply params +activate ae + ae->ae: align motor count + ae->aes: stop extra motors + ae->aes: + group for each new thread/motor + ae -> aes: execute() + activate aes + aes -> atf: get() + atf -> thread**: create + activate atf + aes <- atf: + deactivate atf + aes --> thread: run() + note over ann, thread: At this point, the\nmotor thread starts running\nthe defined activity's action\nover cycles + ae->ae: await thread state update + + ae<-aes: + deactivate aes + end group +caller <- ae +deactivate ae + +== shutdown sequence [after startup] == + +caller -> ae: stopActivity() +activate ae + + ae -> ae: request stop motors + ae -> ae: await all stop + + ae -> activity: shutdownActivity() + activate activity + ae <- activity + deactivate activity + + ae -> ann: Annotate Activity Finish + +caller <- ae +deactivate ae + +== on exception in motor thread == +thread -> aeh: catch() +aeh -> ae: notifyException\n(,) +activate ae + ae -> ae: save exception + ae -> ae: forceStopActivity() + ae -> aes: shutdown(); + activate aes + ae <- aes: + deactivate aes + + group if needed [after timeout]] + ae -> aes: shutdownNow(); + activate aes + ae <- aes + deactivate aes + end group + + ae -> activity: shutdownActivity(); + ae -> activity: closeAutoCloseables(); + + note over thread: action\nthread\nterminates + destroy thread +deactivate ae + + +@enduml diff --git a/devdocs/concurrency/async_scenario-Lifecycle_of_a_single_scenario_call__.svg b/devdocs/concurrency/async_scenario-Lifecycle_of_a_single_scenario_call__.svg new file mode 100644 index 000000000..3bb291402 --- /dev/null +++ b/devdocs/concurrency/async_scenario-Lifecycle_of_a_single_scenario_call__.svg @@ -0,0 +1,160 @@ + + +Lifecycle of a single scenario.call()callercallerScenarioScenarioControllerScriptingEngineActivityExecutorJavaRuntimeJavaRuntimeShutdownHookAnnotationsAnnotationscreateScenariocall()createShutdownHookregister(ShutdownHook)Annotate Scenario StartcreateScenarioControllercreateScriptingEnginerun(script)async calls[javacript+Java]scenario.(*)activities.(*)metrics.(*)params.(*)start(<activity>)createActivityExecutorstartActivity()resultawaitCompletion()for each activityawaitCompletion()unregister(ShutdownHook)run()Annotate Scenario FinishScenarioResulton exception during call()run()Annotate Scenario Finish diff --git a/devdocs/concurrency/async_scenario.puml b/devdocs/concurrency/async_scenario.puml new file mode 100644 index 000000000..eee38edf1 --- /dev/null +++ b/devdocs/concurrency/async_scenario.puml @@ -0,0 +1,67 @@ +@startuml +'https://plantuml.com/sequence-diagram + +title Lifecycle of a single scenario.call() +control "caller" as c +control "Scenario" as s +control "Scenario\nController" as sc +control "Scripting\nEngine" as engine +control "Activity\nExecutor" as ae +control "Java\nRuntime" as jrt +control "Shutdown\nHook" as sh +control "Annotations" as ann + +c -> s**: create + +c -> s: call() +activate s + + s -> sh**: create + s -> jrt: register(ShutdownHook) + s -> ann: Annotate Scenario Start + + s -> sc**: create + s -> engine**: create + + s -> engine: run(script) + activate engine + group async calls [javacript+Java] + engine <--> sc: scenario.(*) + engine <--> sc: activities.(*) + engine <--> sc: metrics.(*) + engine <--> sc: params.(*) + engine -> sc: start() + activate sc + sc -> ae**: create + sc -> ae: startActivity() + + deactivate sc + end group + s <- engine: result + deactivate engine + + s -> sc: awaitCompletion() + activate sc + group for each activity + sc -> ae: awaitCompletion() + activate ae + sc <- ae + deactivate ae + end group + + s <- sc + deactivate sc + + s -> jrt: unregister(ShutdownHook) + s -> sh: run() + sh -> ann: Annotate Scenario Finish + +c <- s: Scenario\nResult +deactivate s + +== on exception during call() == + jrt -> sh: run() + sh -> ann: Annotate Scenario Finish + + +@enduml diff --git a/devdocs/concurrency/async_scenarios-Lifecycle_of_Scenarios.svg b/devdocs/concurrency/async_scenarios-Lifecycle_of_Scenarios.svg new file mode 100644 index 000000000..b482fb579 --- /dev/null +++ b/devdocs/concurrency/async_scenarios-Lifecycle_of_Scenarios.svg @@ -0,0 +1,150 @@ + + +Lifecycle of ScenariosNBCLINBCLIScenarioScenarioControllerScenariosExecutorExceptionHandlerThreadFactoryExecutorServicefuturethreadcreateScenariosExecutorcreateExceptionHandlercreate ThreadFactory(w/ ExceptionHandler)ThreadFactorycreate ExecutorService(w/ ThreadFactory)ExecutorServicecreateScenariocreateScenarioControllerexecute(Scenario)submit(<Callable> Scenario)createfuture<Future<ScenarioResult>>[async] on thread from thread factoryget()createthread<thread>run taskcall()ScenarioResultresult[async] on NBCLI threadawaitAllResults();shutdownloop[timeout]awaitTermination(timeout)loop[each future]get()ScenarioResult<ScenariosResults> diff --git a/devdocs/concurrency/async_scenarios.puml b/devdocs/concurrency/async_scenarios.puml new file mode 100644 index 000000000..89410edd7 --- /dev/null +++ b/devdocs/concurrency/async_scenarios.puml @@ -0,0 +1,62 @@ +@startuml +'https://plantuml.com/sequence-diagram + +title Lifecycle of Scenarios + +control "NBCLI" as nbcli +control "Scenario" as s +control "Scenario\nController" as sc +control "Scenarios\nExecutor" as se +control "Exception\nHandler" as seh +control "Thread\nFactory" as stf +control "Executor\nService" as ses + +nbcli -> se** : create + se -> seh** : create + se -> stf** : create ThreadFactory\n(w/ ExceptionHandler) + se -> ses** : create ExecutorService\n(w/ ThreadFactory) + +nbcli -> s** : create + s -> sc** : create +nbcli --> se : execute(Scenario) +se --> ses: submit( Scenario) +activate ses + ses -> future**: create +se <-- ses: > +deactivate ses + +== [async] on thread from thread factory == +ses -> stf: get() + stf -> thread**: create +ses <- stf: +ses -> thread: run task +activate thread +thread -> s: call() +activate s +thread <- s: ScenarioResult +deactivate s +thread -> future: result +deactivate thread + +== [async] on NBCLI thread == + +nbcli -> se: awaitAllResults(); +activate se + se -> ses: shutdown + loop timeout + se -> ses: awaitTermination(timeout) + activate ses + se <- ses + deactivate ses + end loop + loop each future + se -> future: get() + activate future + se <- future: ScenarioResult + deactivate future + end loop + +nbcli <- se: +deactivate se + +@enduml diff --git a/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/core/RunState.java b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/core/RunState.java index b78b8a247..15ca92a96 100644 --- a/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/core/RunState.java +++ b/engine-api/src/main/java/io/nosqlbench/engine/api/activityapi/core/RunState.java @@ -18,18 +18,36 @@ package io.nosqlbench.engine.api.activityapi.core; public enum RunState { - // Initial state after creation of this control + + /** + * Initial state after creation of this control + */ Uninitialized("i⌀"), - // This thread has been queued to run, but hasn't signaled yet that it is full started - // This must be set by the executor before executing the slot runnable + + /** + * This thread has been queued to run, but hasn't signaled yet that it is full started + * This must be set by the executor before executing the slot runnable + */ Starting("s⏫"), - // This thread is running. This should only be set by the controlled thread + + /** + * This thread is running. This should only be set by the controlled thread + */ Running("R\u23F5"), - // This thread has completed all of its activity, and will do no further work without new input + + /** + * This thread has completed all of its activity, and will do no further work without new input + */ Finished("F⏯"), - // The thread has been requested to stop. This says nothing of the internal state. + + /** + * The thread has been requested to stop. This says nothing of the internal state. + */ Stopping("s⏬"), - // The thread has stopped. This should only be set by the controlled thread + + /** + * The thread has stopped. This should only be set by the controlled thread + */ Stopped("_\u23F9"); private final String runcode; @@ -42,53 +60,25 @@ public enum RunState { return this.runcode; } - public boolean canTransitionTo(RunState to) { - switch (this) { - default: - return false; - case Uninitialized: // A motor was just created. This is its initial state. - case Stopped: - switch (to) { - case Starting: // a motor has been reserved for an execution command - return true; - default: - return false; - } - case Starting: - switch (to) { - case Running: // a motor has indicated that is in the run() method - case Finished: // a motor has exhausted its input, and has declined to go into started mode - return true; - default: - return false; - } - case Running: - switch (to) { - case Stopping: // A request was made to stop the motor before it finished - case Finished: // A motor has exhausted its input, and is finished with its work - return true; - default: - return false; - } - case Stopping: - switch (to) { - case Stopped: // A motor was stopped by request before exhausting input - return true; - default: - return false; - }// A motor was restarted after being stopped - case Finished: - switch (to) { - case Running: // A motor was restarted? - return true; - // not useful as of yet. - // Perhaps this will be allowed via explicit reset of input stream. - // If the input isn't reset, then trying to start a finished motor - // will cause it to short-circuit back to Finished state. - default: - return false; - } - } + /** + * @param target The target state + * @return true if the current state is allowed to transition to the target state + */ + public boolean canTransitionTo(RunState target) { + return switch (this) { + default -> false; // A motor was just created. This is its initial state. + case Uninitialized, Stopped -> (target == Starting); + case Starting -> switch (target) { // a motor has indicated that is in the run() method + case Running, Finished -> true;// a motor has exhausted its input, and has declined to go into started mode + default -> false; + }; + case Running -> switch (target) { // A request was made to stop the motor before it finished + case Stopping, Finished -> true;// A motor has exhausted its input, and is finished with its work + default -> false; + }; + case Stopping -> (target == Stopped); // A motor was stopped by request before exhausting input + case Finished -> (target == Running); // A motor was restarted? + }; } diff --git a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java index 1111d4507..08df6ddc6 100644 --- a/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java +++ b/engine-cli/src/main/java/io/nosqlbench/engine/cli/NBCLI.java @@ -418,7 +418,7 @@ public class NBCLI implements Function { // intentionally not shown for warn-only logger.info("console logging level is " + options.getConsoleLogLevel()); - ScenariosExecutor executor = new ScenariosExecutor("executor-" + sessionName, 1); + ScenariosExecutor scenariosExecutor = new ScenariosExecutor("executor-" + sessionName, 1); if (options.getConsoleLogLevel().isGreaterOrEqualTo(NBLogLevel.WARN)) { options.setWantsStackTraces(true); logger.debug("enabling stack traces since log level is " + options.getConsoleLogLevel()); @@ -466,7 +466,7 @@ public class NBCLI implements Function { scriptParams.putAll(buffer.getCombinedParams()); scenario.addScenarioScriptParams(scriptParams); - executor.execute(scenario); + scenariosExecutor.execute(scenario); // while (true) { // Optional pendingResult = executor.getPendingResult(scenario.getScenarioName()); @@ -476,7 +476,7 @@ public class NBCLI implements Function { // LockSupport.parkNanos(100000000L); // } - ScenariosResults scenariosResults = executor.awaitAllResults(); + ScenariosResults scenariosResults = scenariosExecutor.awaitAllResults(); logger.debug("Total of " + scenariosResults.getSize() + " result object returned from ScenariosExecutor"); ActivityMetrics.closeMetrics(options.wantsEnableChart()); diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityExceptionHandler.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityExceptionHandler.java index 5beb7643a..b088829f3 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityExceptionHandler.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityExceptionHandler.java @@ -23,9 +23,9 @@ public class ActivityExceptionHandler implements Thread.UncaughtExceptionHandler private static final Logger logger = LogManager.getLogger(ActivityExceptionHandler.class); - private final ActivityExecutor executor; + private final ActivityThreadsManager executor; - public ActivityExceptionHandler(ActivityExecutor executor) { + public ActivityExceptionHandler(ActivityThreadsManager executor) { this.executor = executor; logger.debug(() -> "Activity exception handler starting up for executor '" + executor + "'"); } diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityFinisher.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityFinisher.java index 845f7c272..5b8588d80 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityFinisher.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityFinisher.java @@ -22,11 +22,11 @@ import org.apache.logging.log4j.Logger; public class ActivityFinisher extends Thread { private final static Logger logger = LogManager.getLogger(ActivityFinisher.class); - private final ActivityExecutor executor; + private final ActivityThreadsManager executor; private final int timeout; private boolean result; - public ActivityFinisher(ActivityExecutor executor, int timeout) { + public ActivityFinisher(ActivityThreadsManager executor, int timeout) { super(executor.getActivityDef().getAlias() + "_finisher"); this.executor = executor; this.timeout = timeout; diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityStatus.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityStatus.java new file mode 100644 index 000000000..d77927fbc --- /dev/null +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityStatus.java @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.core.lifecycle; + +public class ActivityStatus { +} diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityExecutor.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityThreadsManager.java similarity index 77% rename from engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityExecutor.java rename to engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityThreadsManager.java index 2ca5bfd71..afc53094e 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityExecutor.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ActivityThreadsManager.java @@ -26,11 +26,10 @@ import io.nosqlbench.engine.core.annotation.Annotators; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.util.*; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.SynchronousQueue; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.*; import java.util.stream.Collectors; /** @@ -49,9 +48,9 @@ import java.util.stream.Collectors; * */ -public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressCapable { +public class ActivityThreadsManager implements ActivityController, ParameterMap.Listener, ProgressCapable, Callable { - private static final Logger logger = LogManager.getLogger(ActivityExecutor.class); + private static final Logger logger = LogManager.getLogger(ActivityThreadsManager.class); private static final Logger activitylogger = LogManager.getLogger("ACTIVITY"); private final List> motors = new ArrayList<>(); @@ -68,25 +67,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen // private RunState intendedState = RunState.Uninitialized; - public ActivityExecutor(Activity activity, String sessionId) { + public ActivityThreadsManager(Activity activity, String sessionId) { this.activity = activity; this.activityDef = activity.getActivityDef(); executorService = new ThreadPoolExecutor( - 0, Integer.MAX_VALUE, - 0L, TimeUnit.SECONDS, - new SynchronousQueue<>(), - new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this)) + 0, Integer.MAX_VALUE, + 0L, TimeUnit.SECONDS, + new SynchronousQueue<>(), + new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this)) ); activity.getActivityDef().getParams().addListener(this); activity.setActivityController(this); this.sessionId = sessionId; } - public void setSessionId(String sessionId) { - this.sessionId = sessionId; - } - - // TODO: Doc how uninitialized activities do not propagate parameter map changes and how // TODO: this is different from preventing modification to uninitialized activities @@ -101,14 +95,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen public synchronized void startActivity() { logger.info("starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary()); Annotators.recordAnnotation(Annotation.newBuilder() - .session(sessionId) - .now() - .layer(Layer.Activity) - .label("alias", getActivityDef().getAlias()) - .label("driver", getActivityDef().getActivityType()) - .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) - .detail("params", getActivityDef().toString()) - .build() + .session(sessionId) + .now() + .layer(Layer.Activity) + .label("alias", getActivityDef().getAlias()) + .label("driver", getActivityDef().getActivityType()) + .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) + .detail("params", getActivityDef().toString()) + .build() ); activitylogger.debug("START/before alias=(" + activity.getAlias() + ")"); @@ -130,7 +124,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen /** * Simply stop the motors */ - public synchronized void stopActivity() { + private synchronized void stopActivity() { activitylogger.debug("STOP/before alias=(" + activity.getAlias() + ")"); activity.setRunState(RunState.Stopping); @@ -142,19 +136,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen activity.setRunState(RunState.Stopped); logger.info("stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots"); activitylogger.debug("STOP/after alias=(" + activity.getAlias() + ")"); + Annotators.recordAnnotation(Annotation.newBuilder() - .session(sessionId) - .interval(this.startedAt, this.stoppedAt) - .layer(Layer.Activity) - .label("alias", getActivityDef().getAlias()) - .label("driver", getActivityDef().getActivityType()) - .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) - .detail("params", getActivityDef().toString()) - .build() + .session(sessionId) + .interval(this.startedAt, this.stoppedAt) + .layer(Layer.Activity) + .label("alias", getActivityDef().getAlias()) + .label("driver", getActivityDef().getActivityType()) + .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) + .detail("params", getActivityDef().toString()) + .build() ); } - public synchronized RuntimeException forceStopScenario(int initialMillisToWait) { + public RuntimeException forceStopActivity(int initialMillisToWait) { activitylogger.debug("FORCE STOP/before alias=(" + activity.getAlias() + ")"); activity.setRunState(RunState.Stopped); @@ -206,14 +201,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen * @param initialMillisToWait milliseconds to wait after graceful shutdownActivity request, before forcing * everything to stop */ - public synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) { - RuntimeException exception = forceStopScenario(initialMillisToWait); + private synchronized void forceStopScenarioAndThrow(int initialMillisToWait, boolean rethrow) { + RuntimeException exception = forceStopActivity(initialMillisToWait); if (exception != null && rethrow) { throw exception; } } - public boolean finishAndShutdownExecutor(int secondsToWait) { + private boolean finishAndShutdownExecutor(int secondsToWait) { activitylogger.debug("REQUEST STOP/before alias=(" + activity.getAlias() + ")"); logger.debug("Stopping executor for " + activity.getAlias() + " when work completes."); @@ -245,6 +240,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen if (stoppingException != null) { logger.trace(() -> "an exception caused the activity to stop:" + stoppingException.getMessage()); + logger.trace("Setting ERROR on activity executor: " + stoppingException.getMessage()); throw stoppingException; } @@ -270,10 +266,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen adjustToActivityDef(activity.getActivityDef()); } motors.stream() - .filter(m -> (m instanceof ActivityDefObserver)) + .filter(m -> (m instanceof ActivityDefObserver)) // .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized) // .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting) - .forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef)); + .forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef)); } } @@ -288,25 +284,25 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen *

* TODO: move activity finisher thread to this class and remove separate implementation */ - public boolean awaitCompletion(int waitTime) { - logger.debug(()-> "awaiting completion of '" + this.getActivity().getAlias() + "'"); + private boolean awaitCompletion(int waitTime) { + logger.debug(() -> "awaiting completion of '" + this.getActivity().getAlias() + "'"); boolean finished = finishAndShutdownExecutor(waitTime); Annotators.recordAnnotation(Annotation.newBuilder() - .session(sessionId) - .interval(startedAt, this.stoppedAt) - .layer(Layer.Activity) - .label("alias", getActivityDef().getAlias()) - .label("driver", getActivityDef().getActivityType()) - .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) - .detail("params", getActivityDef().toString()) - .build() + .session(sessionId) + .interval(startedAt, this.stoppedAt) + .layer(Layer.Activity) + .label("alias", getActivityDef().getAlias()) + .label("driver", getActivityDef().getActivityType()) + .label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none")) + .detail("params", getActivityDef().toString()) + .build() ); return finished; } - public boolean awaitFinish(int timeout) { + public boolean awaitFinishedOrStopped(int timeout) { activitylogger.debug("AWAIT-FINISH/before alias=(" + activity.getAlias() + ")"); boolean awaited = awaitAllRequiredMotorState(timeout, 50, RunState.Finished, RunState.Stopped); @@ -327,8 +323,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen private String getSlotStatus() { return motors.stream() - .map(m -> m.getSlotStateTracker().getSlotState().getCode()) - .collect(Collectors.joining(",", "[", "]")); + .map(m -> m.getSlotStateTracker().getSlotState().getCode()) + .collect(Collectors.joining(",", "[", "]")); } /** @@ -371,18 +367,18 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen case Running: case Starting: motors.stream() - .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running) - .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished) - .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting) - .forEach(m -> { - m.getSlotStateTracker().enterState(RunState.Starting); - executorService.execute(m); - }); + .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running) + .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished) + .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting) + .forEach(m -> { + m.getSlotStateTracker().enterState(RunState.Starting); + executorService.execute(m); + }); break; case Stopped: motors.stream() - .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped) - .forEach(Motor::requestStop); + .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped) + .forEach(Motor::requestStop); break; case Finished: case Stopping: @@ -421,31 +417,23 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen * Await a thread (aka motor/slot) entering a specific SlotState * * @param m motor instance - * @param waitTime milliseco`nds to wait, total + * @param waitTime milliseconds to wait, total * @param pollTime polling interval between state checks * @param desiredRunStates any desired SlotState * @return true, if the desired SlotState was detected */ - private boolean awaitMotorState(Motor m, int waitTime, int pollTime, RunState... desiredRunStates) { + private boolean awaitMotorState(Motor m, int waitTime, int pollTime, RunState... desiredRunStates) { long startedAt = System.currentTimeMillis(); while (System.currentTimeMillis() < (startedAt + waitTime)) { - Map actualStates = new HashMap<>(); - for (RunState state : desiredRunStates) { - actualStates.compute(state, (k, v) -> (v == null ? 0 : v) + 1); - } for (RunState desiredRunState : desiredRunStates) { - actualStates.remove(desiredRunState); - } - logger.trace(() -> "state of remaining slots:" + actualStates); - if (actualStates.size() == 0) { - return true; - } else { - System.out.println("motor states:" + actualStates); - try { - Thread.sleep(pollTime); - } catch (InterruptedException ignored) { + if (desiredRunState == m.getSlotStateTracker().getSlotState()) { + return true; } } + try { + Thread.sleep(pollTime); + } catch (InterruptedException ignored) { + } } logger.trace(() -> activityDef.getAlias() + "/Motor[" + m.getSlotId() + "] is now in state " + m.getSlotStateTracker().getSlotState()); return false; @@ -461,7 +449,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen awaited = awaitMotorState(motor, waitTime, pollTime, awaitingState); if (!awaited) { logger.trace(() -> "failed awaiting motor " + motor.getSlotId() + " for state in " + - Arrays.asList(awaitingState)); + Arrays.asList(awaitingState)); break; } } @@ -469,28 +457,6 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen return awaited; } - - private boolean awaitAnyRequiredMotorState(int waitTime, int pollTime, RunState... awaitingState) { - long startedAt = System.currentTimeMillis(); - while (System.currentTimeMillis() < (startedAt + waitTime)) { - for (Motor motor : motors) { - for (RunState state : awaitingState) { - if (motor.getSlotStateTracker().getSlotState() == state) { - logger.trace(() -> "at least one 'any' of " + activityDef.getAlias() + "/Motor[" + motor.getSlotId() + "] is now in state " + motor.getSlotStateTracker().getSlotState()); - return true; - } - } - } - try { - Thread.sleep(pollTime); - } catch (InterruptedException ignored) { - } - } - logger.trace(() -> "none of " + activityDef.getAlias() + "/Motor [" + motors.size() + "] is in states in " + Arrays.asList(awaitingState)); - return false; - } - - /** * Await a required thread (aka motor/slot) entering a specific SlotState * @@ -505,8 +471,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen boolean awaitedRequiredState = awaitMotorState(m, waitTime, pollTime, awaitingState); if (!awaitedRequiredState) { String error = "Unable to await " + activityDef.getAlias() + - "/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState() - + " after waiting for " + waitTime + "ms"; + "/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState() + + " after waiting for " + waitTime + "ms"; RuntimeException e = new RuntimeException(error); logger.error(error); throw e; @@ -532,7 +498,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen public synchronized void notifyException(Thread t, Throwable e) { logger.debug(() -> "Uncaught exception in activity thread forwarded to activity executor: " + e.getMessage()); this.stoppingException = new RuntimeException("Error in activity thread " + t.getName(), e); - forceStopScenario(10000); + forceStopActivity(10000); } @Override @@ -564,4 +530,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen } + @Override + public synchronized ExecResult call() throws Exception { + boolean stopped = awaitCompletion(Integer.MAX_VALUE); + ExecResult result = new ExecResult(startedAt, stoppedAt, "", this.stoppingException); + return result; + } } diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioResult.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ExecMetricsResult.java similarity index 68% rename from engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioResult.java rename to engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ExecMetricsResult.java index 54bf4d007..f20b943b5 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioResult.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ExecMetricsResult.java @@ -20,20 +20,16 @@ import com.codahale.metrics.*; import io.nosqlbench.api.engine.metrics.ActivityMetrics; import io.nosqlbench.engine.core.logging.Log4JMetricsReporter; import io.nosqlbench.engine.core.metrics.NBMetricsSummary; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.ByteArrayOutputStream; import java.io.PrintStream; import java.nio.charset.StandardCharsets; import java.util.HashSet; -import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; -public class ScenarioResult { +public class ExecMetricsResult extends ExecResult { - private final static Logger logger = LogManager.getLogger(ScenarioResult.class); public static final Set INTERVAL_ONLY_METRICS = Set.of( MetricAttribute.MIN, MetricAttribute.MAX, @@ -51,32 +47,12 @@ public class ScenarioResult { MetricAttribute.M5_RATE, MetricAttribute.M15_RATE ); - private final long startedAt; - private final long endedAt; - private final Exception exception; - private final String iolog; - - public ScenarioResult(Exception e, String iolog, long startedAt, long endedAt) { - logger.debug("populating "+(e==null? "NORMAL" : "ERROR")+" scenario result"); - if (logger.isDebugEnabled()) { - StackTraceElement[] st = Thread.currentThread().getStackTrace(); - for (int i = 0; i < st.length; i++) { - logger.debug(":AT " + st[i].getFileName()+":"+st[i].getLineNumber()+":"+st[i].getMethodName()); - if (i>10) break; - } - } - this.iolog = ((iolog!=null) ? iolog + "\n\n" : "") + (e!=null? e.getMessage() : ""); - this.startedAt = startedAt; - this.endedAt = endedAt; - this.exception = e; + public ExecMetricsResult(long startedAt, long endedAt, String iolog, Exception e) { + super(startedAt, endedAt, iolog, e); } - public void reportElapsedMillis() { - logger.info("-- SCENARIO TOOK " + getElapsedMillis() + "ms --"); - } - - public String getSummaryReport() { + public String getMetricsSummary() { ByteArrayOutputStream os = new ByteArrayOutputStream(); PrintStream ps = new PrintStream(os); ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(ActivityMetrics.getMetricRegistry()) @@ -91,45 +67,23 @@ public class ScenarioResult { builder.disabledMetricAttributes(disabled); ConsoleReporter consoleReporter = builder.build(); consoleReporter.report(); - ps.flush(); + consoleReporter.close(); String result = os.toString(StandardCharsets.UTF_8); return result; } public void reportToConsole() { - String summaryReport = getSummaryReport(); + String summaryReport = getMetricsSummary(); System.out.println(summaryReport); } - public Optional getException() { - return Optional.ofNullable(exception); + public void reportMetricsSummaryTo(PrintStream out) { + out.println(getMetricsSummary()); } - public void rethrowIfError() { - if (exception != null) { - if (exception instanceof RuntimeException) { - throw ((RuntimeException) exception); - } else { - throw new RuntimeException(exception); - } - } - } - - public String getIOLog() { - return this.iolog; - } - - public long getElapsedMillis() { - return endedAt - startedAt; - } - - public void reportTo(PrintStream out) { - out.println(getSummaryReport()); - } - - public void reportToLog() { + public void reportMetricsSummaryToLog() { logger.debug("-- WARNING: Metrics which are taken per-interval (like histograms) will not have --"); logger.debug("-- active data on this last report. (The workload has already stopped.) Record --"); logger.debug("-- metrics to an external format to see values for each reporting interval. --"); @@ -142,10 +96,11 @@ public class ScenarioResult { .outputTo(logger) .build(); reporter.report(); + reporter.close(); logger.debug("-- END METRICS DETAIL --"); } - public void reportCountsTo(PrintStream printStream) { + public void reportMetricsCountsTo(PrintStream printStream) { StringBuilder sb = new StringBuilder(); ActivityMetrics.getMetricRegistry().getMetrics().forEach((k, v) -> { diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ExecResult.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ExecResult.java new file mode 100644 index 000000000..db1454143 --- /dev/null +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ExecResult.java @@ -0,0 +1,68 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.core.lifecycle; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.Optional; + +/** + * Provide a result type back to a caller, including the start and end times, + * any exception that occurred, and any content written to stdout or stderr equivalent + * IO streams. This is an execution result. + * + */ +public class ExecResult { + protected final static Logger logger = LogManager.getLogger(ExecMetricsResult.class); + protected final long startedAt; + protected final long endedAt; + protected final Exception exception; + protected final String iolog; + + public ExecResult(long startedAt, long endedAt, String iolog, Exception e) { + this.startedAt = startedAt; + this.endedAt = endedAt; + this.exception = e; + this.iolog = ((iolog != null) ? iolog + "\n\n" : "") + (e != null ? e.getMessage() : ""); + logger.debug("populating "+(e==null? "NORMAL" : "ERROR")+" scenario result"); + if (logger.isDebugEnabled()) { + StackTraceElement[] st = Thread.currentThread().getStackTrace(); + for (int i = 0; i < st.length; i++) { + logger.debug(":AT " + st[i].getFileName()+":"+st[i].getLineNumber()+":"+st[i].getMethodName()); + if (i>10) break; + } + } + + } + + public void reportElapsedMillisToLog() { + logger.info("-- SCENARIO TOOK " + getElapsedMillis() + "ms --"); + } + + public String getIOLog() { + return this.iolog; + } + + public long getElapsedMillis() { + return endedAt - startedAt; + } + + public Optional getException() { + return Optional.ofNullable(exception); + } +} diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioController.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioController.java index 0bda67c11..99929eeb4 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioController.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenarioController.java @@ -39,14 +39,15 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; /** - * A ScenarioController provides a way to start Activities, modify them while running, and forceStopMotors, pause or restart them. + * A ScenarioController provides a way to start Activities, + * modify them while running, and forceStopMotors, pause or restart them. */ public class ScenarioController { private static final Logger logger = LogManager.getLogger(ScenarioController.class); private static final Logger scenariologger = LogManager.getLogger("SCENARIO"); - private final Map activityExecutors = new ConcurrentHashMap<>(); + private final Map activityExecutors = new ConcurrentHashMap<>(); private final String sessionId; private final Maturity minMaturity; @@ -72,10 +73,9 @@ public class ScenarioController { .build()); - ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true); + ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, true); scenariologger.debug("START " + activityDef.getAlias()); - activityExecutor.startActivity(); - + activityThreadsManager.startActivity(); } /** @@ -120,12 +120,12 @@ public class ScenarioController { .detail("params", activityDef.toString()) .build()); - ActivityExecutor activityExecutor = getActivityExecutor(activityDef, true); + ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, true); scenariologger.debug("RUN alias=" + activityDef.getAlias()); scenariologger.debug(" (RUN/START) alias=" + activityDef.getAlias()); - activityExecutor.startActivity(); + activityThreadsManager.startActivity(); scenariologger.debug(" (RUN/AWAIT before) alias=" + activityDef.getAlias()); - boolean completed = activityExecutor.awaitCompletion(timeout); + boolean completed = activityThreadsManager.awaitCompletion(timeout); scenariologger.debug(" (RUN/AWAIT after) completed=" + activityDef.getAlias()); } @@ -154,8 +154,8 @@ public class ScenarioController { public boolean isRunningActivity(ActivityDef activityDef) { - ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false); - return activityExecutor != null && activityExecutor.isRunning(); + ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, false); + return activityThreadsManager != null && activityThreadsManager.isRunning(); } public boolean isRunningActivity(Map activityDefMap) { @@ -180,18 +180,18 @@ public class ScenarioController { .detail("params", activityDef.toString()) .build()); - ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false); - if (activityExecutor == null) { + ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, false); + if (activityThreadsManager == null) { throw new RuntimeException("could not stop missing activity:" + activityDef); } - RunState runstate = activityExecutor.getActivity().getRunState(); + RunState runstate = activityThreadsManager.getActivity().getRunState(); if (runstate != RunState.Running) { - logger.warn("NOT stopping activity '" + activityExecutor.getActivity().getAlias() + "' because it is in state '" + runstate + "'"); + logger.warn("NOT stopping activity '" + activityThreadsManager.getActivity().getAlias() + "' because it is in state '" + runstate + "'"); return; } scenariologger.debug("STOP " + activityDef.getAlias()); - activityExecutor.stopActivity(); + activityThreadsManager.stopActivity(); } /** @@ -240,8 +240,8 @@ public class ScenarioController { if (param.equals("alias")) { throw new InvalidParameterException("It is not allowed to change the name of an existing activity."); } - ActivityExecutor activityExecutor = getActivityExecutor(alias); - ParameterMap params = activityExecutor.getActivityDef().getParams(); + ActivityThreadsManager activityThreadsManager = getActivityExecutor(alias); + ParameterMap params = activityThreadsManager.getActivityDef().getParams(); scenariologger.debug("SET (" + alias + "/" + param + ")=(" + value + ")"); params.set(param, value); } @@ -261,7 +261,7 @@ public class ScenarioController { throw new BasicError("alias must be provided"); } - ActivityExecutor executor = activityExecutors.get(alias); + ActivityThreadsManager executor = activityExecutors.get(alias); if (executor == null) { logger.info("started scenario from apply:" + alias); @@ -290,8 +290,8 @@ public class ScenarioController { * @return the associated ActivityExecutor * @throws RuntimeException a runtime exception if the named activity is not found */ - private ActivityExecutor getActivityExecutor(String activityAlias) { - Optional executor = + private ActivityThreadsManager getActivityExecutor(String activityAlias) { + Optional executor = Optional.ofNullable(activityExecutors.get(activityAlias)); return executor.orElseThrow( () -> new RuntimeException("ActivityExecutor for alias " + activityAlias + " not found.") @@ -315,9 +315,9 @@ public class ScenarioController { return matching; } - private ActivityExecutor getActivityExecutor(ActivityDef activityDef, boolean createIfMissing) { + private ActivityThreadsManager getActivityExecutor(ActivityDef activityDef, boolean createIfMissing) { synchronized (activityExecutors) { - ActivityExecutor executor = activityExecutors.get(activityDef.getAlias()); + ActivityThreadsManager executor = activityExecutors.get(activityDef.getAlias()); if (executor == null && createIfMissing) { if (activityDef.getParams().containsKey("driver")) { @@ -333,7 +333,7 @@ public class ScenarioController { ) ); - executor = new ActivityExecutor( + executor = new ActivityThreadsManager( activityType.getAssembledActivity( activityDef, getActivityMap() @@ -342,7 +342,7 @@ public class ScenarioController { ); activityExecutors.put(activityDef.getAlias(), executor); } else { - executor = new ActivityExecutor( + executor = new ActivityThreadsManager( new StandardActivityType(activityDef).getAssembledActivity( activityDef, getActivityMap() ), this.sessionId @@ -392,7 +392,7 @@ public class ScenarioController { */ public List getActivityDefs() { return activityExecutors.values().stream() - .map(ActivityExecutor::getActivityDef) + .map(ActivityThreadsManager::getActivityDef) .collect(Collectors.toList()); } @@ -425,7 +425,8 @@ public class ScenarioController { /** * Await completion of all running activities, but do not force shutdownActivity. This method is meant to provide - * the blocking point for calling logic. It waits. + * the blocking point for calling logic. It waits. If there is an error which should propagate into the scenario, + * then it should be thrown from this method. * * @param waitTimeMillis The time to wait, usually set very high * @return true, if all activities completed before the timer expired, false otherwise @@ -436,7 +437,7 @@ public class ScenarioController { long remaining = waitTimeMillis; List finishers = new ArrayList<>(); - for (ActivityExecutor ae : activityExecutors.values()) { + for (ActivityThreadsManager ae : activityExecutors.values()) { ActivityFinisher finisher = new ActivityFinisher(ae, (int) remaining); finishers.add(finisher); finisher.start(); @@ -492,12 +493,12 @@ public class ScenarioController { } public boolean awaitActivity(ActivityDef activityDef) { - ActivityExecutor activityExecutor = getActivityExecutor(activityDef, false); - if (activityExecutor == null) { + ActivityThreadsManager activityThreadsManager = getActivityExecutor(activityDef, false); + if (activityThreadsManager == null) { throw new RuntimeException("Could not await missing activity: " + activityDef); } scenariologger.debug("AWAIT/before alias=" + activityDef.getAlias()); - boolean finished = activityExecutor.awaitFinish(Integer.MAX_VALUE); + boolean finished = activityThreadsManager.awaitFinishedOrStopped(Integer.MAX_VALUE); scenariologger.debug("AWAIT/after completed=" + finished); return finished; @@ -506,7 +507,7 @@ public class ScenarioController { /** * @return an unmodifyable String to executor map of all activities known to this scenario */ - public Map getActivityExecutorMap() { + public Map getActivityExecutorMap() { return Collections.unmodifiableMap(activityExecutors); } @@ -516,7 +517,7 @@ public class ScenarioController { private Map getActivityMap() { Map activityMap = new HashMap(); - for (Map.Entry entry : activityExecutors.entrySet()) { + for (Map.Entry entry : activityExecutors.entrySet()) { activityMap.put(entry.getKey(), entry.getValue().getActivity()); } return activityMap; @@ -524,7 +525,7 @@ public class ScenarioController { public List getProgressMeters() { List indicators = new ArrayList<>(); - for (ActivityExecutor ae : activityExecutors.values()) { + for (ActivityThreadsManager ae : activityExecutors.values()) { indicators.add(ae.getProgressMeter()); } indicators.sort(Comparator.comparing(ProgressMeterDisplay::getStartTime)); diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenariosResults.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenariosResults.java index 6eb5774ea..80c44b8c8 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenariosResults.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/ScenariosResults.java @@ -28,27 +28,26 @@ public class ScenariosResults { private static final Logger logger = LogManager.getLogger(ScenariosResults.class); private final String scenariosExecutorName; - private final Map scenarioResultMap = new LinkedHashMap<>(); + private final Map scenarioResultMap = new LinkedHashMap<>(); public ScenariosResults(ScenariosExecutor scenariosExecutor) { this.scenariosExecutorName = scenariosExecutor.getName(); } - public ScenariosResults(ScenariosExecutor scenariosExecutor, Map map) { + public ScenariosResults(ScenariosExecutor scenariosExecutor, Map map) { this.scenariosExecutorName = scenariosExecutor.getName(); scenarioResultMap.putAll(map); } public String getExecutionSummary() { - StringBuilder sb = new StringBuilder("executions: "); - sb.append(scenarioResultMap.size()).append(" scenarios, "); - sb.append(scenarioResultMap.values().stream().filter(r -> r.getException().isEmpty()).count()).append(" normal, "); - sb.append(scenarioResultMap.values().stream().filter(r -> r.getException().isPresent()).count()).append(" errored"); - return sb.toString(); + String sb = "executions: " + scenarioResultMap.size() + " scenarios, " + + scenarioResultMap.values().stream().filter(r -> r.getException().isEmpty()).count() + " normal, " + + scenarioResultMap.values().stream().filter(r -> r.getException().isPresent()).count() + " errored"; + return sb; } - public ScenarioResult getOne() { + public ExecMetricsResult getOne() { if (this.scenarioResultMap.size() != 1) { throw new RuntimeException("getOne found " + this.scenarioResultMap.size() + " results instead of 1."); } @@ -57,14 +56,14 @@ public class ScenariosResults { } public void reportToLog() { - for (Map.Entry entry : this.scenarioResultMap.entrySet()) { + for (Map.Entry entry : this.scenarioResultMap.entrySet()) { Scenario scenario = entry.getKey(); - ScenarioResult oresult = entry.getValue(); + ExecMetricsResult oresult = entry.getValue(); logger.info("results for scenario: " + scenario); if (oresult != null) { - oresult.reportElapsedMillis(); + oresult.reportElapsedMillisToLog(); } else { logger.error(scenario.getScenarioName() + ": incomplete (missing result)"); } diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/StartedActivityInfo.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/StartedActivityInfo.java new file mode 100644 index 000000000..28b7f76c9 --- /dev/null +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/StartedActivityInfo.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.core.lifecycle; + +import io.nosqlbench.engine.api.activityapi.core.Activity; + +public class StartedActivityInfo { + private final Activity activity; + + StartedActivityInfo(Activity activity) { + this.activity = activity; + } + + +} diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/script/Scenario.java b/engine-core/src/main/java/io/nosqlbench/engine/core/script/Scenario.java index 93fb5183c..a087b444d 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/script/Scenario.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/script/Scenario.java @@ -27,9 +27,9 @@ import io.nosqlbench.engine.api.extensions.ScriptingPluginInfo; import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer; import io.nosqlbench.engine.core.annotation.Annotators; import io.nosqlbench.engine.core.lifecycle.ActivityProgressIndicator; +import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult; import io.nosqlbench.engine.core.lifecycle.PolyglotScenarioController; import io.nosqlbench.engine.core.lifecycle.ScenarioController; -import io.nosqlbench.engine.core.lifecycle.ScenarioResult; import io.nosqlbench.engine.core.metrics.PolyglotMetricRegistryBindings; import io.nosqlbench.nb.annotations.Maturity; import org.apache.logging.log4j.LogManager; @@ -58,7 +58,7 @@ import java.util.Optional; import java.util.concurrent.Callable; import java.util.stream.Collectors; -public class Scenario implements Callable { +public class Scenario implements Callable { private final String commandLine; private final String reportSummaryTo; @@ -71,9 +71,9 @@ public class Scenario implements Callable { private Exception error; private ScenarioMetadata scenarioMetadata; - private ScenarioResult result; + private ExecMetricsResult result; - public Optional getResultIfComplete() { + public Optional getResultIfComplete() { return Optional.ofNullable(this.result); } @@ -171,7 +171,7 @@ public class Scenario implements Callable { return this; } - private void initializeScriptingEngine() { + private void initializeScriptingEngine(ScenarioController scenarioController) { logger.debug("Using engine " + engine.toString()); MetricRegistry metricRegistry = ActivityMetrics.getMetricRegistry(); @@ -198,12 +198,10 @@ public class Scenario implements Callable { this.scriptEngine = GraalJSScriptEngine.create(polyglotEngine, contextSettings); - scenarioController = new ScenarioController(this.scenarioName, minMaturity); if (!progressInterval.equals("disabled")) { activityProgressIndicator = new ActivityProgressIndicator(scenarioController, progressInterval); } - scriptEnv = new ScenarioContext(scenarioController); scriptEngine.setContext(scriptEnv); @@ -264,9 +262,22 @@ public class Scenario implements Callable { .build() ); - initializeScriptingEngine(); logger.debug("Running control script for " + getScenarioName() + "."); + scenarioController = new ScenarioController(this.scenarioName, minMaturity); + initializeScriptingEngine(scenarioController); + executeScenarioScripts(); + long awaitCompletionTime = 86400 * 365 * 1000L; + logger.debug("Awaiting completion of scenario and activities for " + awaitCompletionTime + " millis."); + + scenarioController.awaitCompletion(awaitCompletionTime); + //TODO: Ensure control flow covers controller shutdown in event of internal error. + + Runtime.getRuntime().removeShutdownHook(scenarioShutdownHook); + scenarioShutdownHook.run(); + } + + private void executeScenarioScripts() { for (String script : scripts) { try { Object result = null; @@ -304,6 +315,7 @@ public class Scenario implements Callable { System.err.flush(); System.out.flush(); } catch (Exception e) { + this.error=e; this.state = State.Errored; logger.error("Error in scenario, shutting down. (" + e + ")"); try { @@ -311,7 +323,6 @@ public class Scenario implements Callable { } catch (Exception eInner) { logger.debug("Found inner exception while forcing stop with rethrow=false: " + eInner); } finally { - this.error = e; throw new RuntimeException(e); } } finally { @@ -320,14 +331,6 @@ public class Scenario implements Callable { endedAtMillis = System.currentTimeMillis(); } } - long awaitCompletionTime = 86400 * 365 * 1000L; - logger.debug("Awaiting completion of scenario and activities for " + awaitCompletionTime + " millis."); - scenarioController.awaitCompletion(awaitCompletionTime); - //TODO: Ensure control flow covers controller shutdown in event of internal error. - - Runtime.getRuntime().removeShutdownHook(scenarioShutdownHook); - scenarioShutdownHook = null; - finish(); } public void finish() { @@ -370,9 +373,23 @@ public class Scenario implements Callable { /** * This should be the only way to get a ScenarioResult for a Scenario. * + * The lifecycle of a scenario includes the lifecycles of all of the following: + *

    + *
  1. The scenario control script, executing within a graaljs context.
  2. + *
  3. The lifecycle of every activity which is started within the scenario.
  4. + *
+ * + * All of these run asynchronously within the scenario, however the same thread that calls + * the scenario is the one which executes the control script. A scenario ends when all + * of the following conditions are met: + *
    + *
  • The scenario control script has run to completion, or experienced an exception.
  • + *
  • Each activity has run to completion, experienced an exception, or all
  • + *
+ * * @return */ - public synchronized ScenarioResult call() { + public synchronized ExecMetricsResult call() { if (result == null) { try { runScenario(); @@ -386,15 +403,15 @@ public class Scenario implements Callable { } String iolog = scriptEnv.getTimedLog(); - this.result = new ScenarioResult(this.error, iolog, this.startedAtMillis, this.endedAtMillis); - result.reportToLog(); + this.result = new ExecMetricsResult(this.error, iolog, this.startedAtMillis, this.endedAtMillis); + result.reportMetricsSummaryToLog(); doReportSummaries(reportSummaryTo, result); } return result; } - private void doReportSummaries(String reportSummaryTo, ScenarioResult result) { + private void doReportSummaries(String reportSummaryTo, ExecMetricsResult result) { List fullChannels = new ArrayList<>(); List briefChannels = new ArrayList<>(); @@ -437,7 +454,7 @@ public class Scenario implements Callable { } } } - fullChannels.forEach(result::reportTo); + fullChannels.forEach(result::reportMetricsSummaryTo); // briefChannels.forEach(result::reportCountsTo); } diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/script/ScenariosExecutor.java b/engine-core/src/main/java/io/nosqlbench/engine/core/script/ScenariosExecutor.java index 0d6581266..dedaf4ff3 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/script/ScenariosExecutor.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/script/ScenariosExecutor.java @@ -50,7 +50,7 @@ public class ScenariosExecutor { if (submitted.get(scenario.getScenarioName()) != null) { throw new BasicError("Scenario " + scenario.getScenarioName() + " is already defined. Remove it first to reuse the name."); } - Future future = executor.submit(scenario); + Future future = executor.submit(scenario); SubmittedScenario s = new SubmittedScenario(scenario, future); submitted.put(s.getName(), s); } @@ -106,7 +106,7 @@ public class ScenariosExecutor { throw new RuntimeException("executor still runningScenarios after awaiting all results for " + timeout + "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown()); } - Map scenarioResultMap = new LinkedHashMap<>(); + Map scenarioResultMap = new LinkedHashMap<>(); getAsyncResultStatus() .entrySet() .forEach( @@ -133,26 +133,26 @@ public class ScenariosExecutor { * All submitted scenarios are included. Those which are still pending * are returned with an empty option.

* - *

Results may be exceptional. If {@link ScenarioResult#getException()} is present, + *

Results may be exceptional. If {@link ExecMetricsResult#getException()} is present, * then the result did not complete normally.

* * @return map of async results, with incomplete results as Optional.empty() */ - public Map> getAsyncResultStatus() { + public Map> getAsyncResultStatus() { - Map> optResults = new LinkedHashMap<>(); + Map> optResults = new LinkedHashMap<>(); for (SubmittedScenario submittedScenario : submitted.values()) { - Future resultFuture = submittedScenario.getResultFuture(); + Future resultFuture = submittedScenario.getResultFuture(); - Optional oResult = Optional.empty(); + Optional oResult = Optional.empty(); if (resultFuture.isDone()) { try { oResult = Optional.of(resultFuture.get()); } catch (Exception e) { long now = System.currentTimeMillis(); logger.debug("creating exceptional scenario result from getAsyncResultStatus"); - oResult = Optional.of(new ScenarioResult(e, "errored output", now, now)); + oResult = Optional.of(new ExecMetricsResult(now, now, "errored output", e)); } } @@ -179,7 +179,7 @@ public class ScenariosExecutor { * @param scenarioName the scenario name of interest * @return an optional result */ - public Optional> getPendingResult(String scenarioName) { + public Optional> getPendingResult(String scenarioName) { return Optional.ofNullable(submitted.get(scenarioName)).map(s -> s.resultFuture); } @@ -224,9 +224,9 @@ public class ScenariosExecutor { private static class SubmittedScenario { private final Scenario scenario; - private final Future resultFuture; + private final Future resultFuture; - SubmittedScenario(Scenario scenario, Future resultFuture) { + SubmittedScenario(Scenario scenario, Future resultFuture) { this.scenario = scenario; this.resultFuture = resultFuture; } @@ -235,7 +235,7 @@ public class ScenariosExecutor { return scenario; } - Future getResultFuture() { + Future getResultFuture() { return resultFuture; } diff --git a/engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java b/engine-core/src/test/java/io/nosqlbench/engine/core/ActivityThreadsManagerTest.java similarity index 94% rename from engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java rename to engine-core/src/test/java/io/nosqlbench/engine/core/ActivityThreadsManagerTest.java index a6e93e8d7..b82c1cd29 100644 --- a/engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java +++ b/engine-core/src/test/java/io/nosqlbench/engine/core/ActivityThreadsManagerTest.java @@ -28,7 +28,7 @@ import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser; import io.nosqlbench.engine.api.activityimpl.input.AtomicInput; import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor; import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser; -import io.nosqlbench.engine.core.lifecycle.ActivityExecutor; +import io.nosqlbench.engine.core.lifecycle.ActivityThreadsManager; import io.nosqlbench.engine.core.lifecycle.ActivityTypeLoader; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.LogManager; @@ -38,8 +38,8 @@ import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; -public class ActivityExecutorTest { - private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class); +public class ActivityThreadsManagerTest { + private static final Logger logger = LogManager.getLogger(ActivityThreadsManagerTest.class); @Test public synchronized void testRestart() { @@ -55,7 +55,7 @@ public class ActivityExecutorTest { a.setInputDispenserDelegate(idisp); a.setMotorDispenserDelegate(mdisp); - ActivityExecutor ae = new ActivityExecutor(a, "test-restart"); + ActivityThreadsManager ae = new ActivityThreadsManager(a, "test-restart"); ad.setThreads(1); ae.startActivity(); ae.stopActivity(); @@ -79,7 +79,7 @@ public class ActivityExecutorTest { a.setInputDispenserDelegate(idisp); a.setMotorDispenserDelegate(mdisp); - ActivityExecutor ae = new ActivityExecutor(a, "test-delayed-start"); + ActivityThreadsManager ae = new ActivityThreadsManager(a, "test-delayed-start"); ad.setThreads(1); ae.startActivity(); ae.awaitCompletion(15000); @@ -104,7 +104,7 @@ public class ActivityExecutorTest { a.setInputDispenserDelegate(idisp); a.setMotorDispenserDelegate(mdisp); - ActivityExecutor ae = new ActivityExecutor(a, "test-new-executor"); + ActivityThreadsManager ae = new ActivityThreadsManager(a, "test-new-executor"); ad.setThreads(5); ae.startActivity(); diff --git a/engine-rest/src/main/java/io/nosqlbench/engine/rest/resources/ScenarioExecutorEndpoint.java b/engine-rest/src/main/java/io/nosqlbench/engine/rest/resources/ScenarioExecutorEndpoint.java index 06a3f0d5e..52ecd9b22 100644 --- a/engine-rest/src/main/java/io/nosqlbench/engine/rest/resources/ScenarioExecutorEndpoint.java +++ b/engine-rest/src/main/java/io/nosqlbench/engine/rest/resources/ScenarioExecutorEndpoint.java @@ -21,7 +21,7 @@ import io.nosqlbench.engine.cli.BasicScriptBuffer; import io.nosqlbench.engine.cli.Cmd; import io.nosqlbench.engine.cli.NBCLICommandParser; import io.nosqlbench.engine.cli.ScriptBuffer; -import io.nosqlbench.engine.core.lifecycle.ScenarioResult; +import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult; import io.nosqlbench.engine.core.script.Scenario; import io.nosqlbench.engine.core.script.ScenariosExecutor; import io.nosqlbench.engine.rest.services.WorkSpace; @@ -234,8 +234,8 @@ public class ScenarioExecutorEndpoint implements WebServiceObject { Optional pendingScenario = executor.getPendingScenario(scenarioName); if (pendingScenario.isPresent()) { - Optional> pendingResult = executor.getPendingResult(scenarioName); - Future scenarioResultFuture = pendingResult.get(); + Optional> pendingResult = executor.getPendingResult(scenarioName); + Future scenarioResultFuture = pendingResult.get(); return new LiveScenarioView(pendingScenario.get()); } else { throw new RuntimeException("Scenario name '" + scenarioName + "' not found."); diff --git a/engine-rest/src/main/java/io/nosqlbench/engine/rest/transfertypes/ResultView.java b/engine-rest/src/main/java/io/nosqlbench/engine/rest/transfertypes/ResultView.java index 1394ce7e6..e84da8113 100644 --- a/engine-rest/src/main/java/io/nosqlbench/engine/rest/transfertypes/ResultView.java +++ b/engine-rest/src/main/java/io/nosqlbench/engine/rest/transfertypes/ResultView.java @@ -16,13 +16,13 @@ package io.nosqlbench.engine.rest.transfertypes; -import io.nosqlbench.engine.core.lifecycle.ScenarioResult; +import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult; public class ResultView { - private final ScenarioResult result; + private final ExecMetricsResult result; - public ResultView(ScenarioResult result) { + public ResultView(ExecMetricsResult result) { this.result = result; } diff --git a/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/ScriptExampleTests.java b/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/ScriptExampleTests.java index bc230712f..a84a712b2 100644 --- a/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/ScriptExampleTests.java +++ b/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/ScriptExampleTests.java @@ -16,7 +16,7 @@ package io.nosqlbench.nbr.examples; -import io.nosqlbench.engine.core.lifecycle.ScenarioResult; +import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult; import io.nosqlbench.engine.core.lifecycle.ScenariosResults; import io.nosqlbench.engine.core.script.Scenario; import io.nosqlbench.engine.core.script.ScenariosExecutor; @@ -41,7 +41,7 @@ import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; public class ScriptExampleTests { - public static ScenarioResult runScenario(String scriptname, String... params) { + public static ExecMetricsResult runScenario(String scriptname, String... params) { if ((params.length % 2) != 0) { throw new RuntimeException("params must be pairwise key, value, ..."); } @@ -74,7 +74,7 @@ public class ScriptExampleTests { // s.addScriptText("load('classpath:scripts/async/" + scriptname + ".js');"); executor.execute(s); ScenariosResults scenariosResults = executor.awaitAllResults(); - ScenarioResult scenarioResult = scenariosResults.getOne(); + ExecMetricsResult scenarioResult = scenariosResults.getOne(); executor.shutdownNow(); return scenarioResult; } @@ -86,7 +86,7 @@ public class ScriptExampleTests { @Test public void testLinkedInput() { - ScenarioResult scenarioResult = runScenario("linkedinput"); + ExecMetricsResult scenarioResult = runScenario("linkedinput"); Pattern p = Pattern.compile(".*started leader.*started follower.*stopped leader.*stopped follower.*", Pattern.DOTALL); assertThat(p.matcher(scenarioResult.getIOLog()).matches()).isTrue(); @@ -94,14 +94,14 @@ public class ScriptExampleTests { @Test public void testExceptionPropagationFromMotorThread() { - ScenarioResult scenarioResult = runScenario("activityerror"); + ExecMetricsResult scenarioResult = runScenario("activityerror"); assertThat(scenarioResult.getException()).isPresent(); assertThat(scenarioResult.getException().get().getMessage()).contains("For input string: \"unparsable\""); } @Test public void testCycleRate() { - ScenarioResult scenarioResult = runScenario("cycle_rate"); + ExecMetricsResult scenarioResult = runScenario("cycle_rate"); String iolog = scenarioResult.getIOLog(); System.out.println("iolog\n" + iolog); Pattern p = Pattern.compile(".*mean cycle rate = (\\d[.\\d]+).*", Pattern.DOTALL); @@ -116,13 +116,13 @@ public class ScriptExampleTests { @Test public void testExtensionPoint() { - ScenarioResult scenarioResult = runScenario("extensions"); + ExecMetricsResult scenarioResult = runScenario("extensions"); assertThat(scenarioResult.getIOLog()).contains("sum is 46"); } @Test public void testOptimo() { - ScenarioResult scenarioResult = runScenario("optimo"); + ExecMetricsResult scenarioResult = runScenario("optimo"); String iolog = scenarioResult.getIOLog(); System.out.println("iolog\n" + iolog); assertThat(iolog).contains("map of result was"); @@ -130,14 +130,14 @@ public class ScriptExampleTests { @Test public void testExtensionCsvLogger() { - ScenarioResult scenarioResult = runScenario("extension_csvmetrics"); + ExecMetricsResult scenarioResult = runScenario("extension_csvmetrics"); assertThat(scenarioResult.getIOLog()).contains("started new " + "csvlogger: logs/csvmetricstestdir"); } @Test public void testScriptParamsVariable() { - ScenarioResult scenarioResult = runScenario("params_variable", "one", "two", "three", "four"); + ExecMetricsResult scenarioResult = runScenario("params_variable", "one", "two", "three", "four"); assertThat(scenarioResult.getIOLog()).contains("params[\"one\"]='two'"); assertThat(scenarioResult.getIOLog()).contains("params[\"three\"]='four'"); assertThat(scenarioResult.getIOLog()).contains("overridden[\"three\"] [overridden-three-five]='five'"); @@ -146,7 +146,7 @@ public class ScriptExampleTests { @Test public void testScriptParamsUndefVariableWithOverride() { - ScenarioResult scenarioResult = runScenario("undef_param", "one", "two", "three", "four"); + ExecMetricsResult scenarioResult = runScenario("undef_param", "one", "two", "three", "four"); assertThat(scenarioResult.getIOLog()).contains("before: params[\"three\"]:four"); assertThat(scenarioResult.getIOLog()).contains("before: params.three:four"); assertThat(scenarioResult.getIOLog()).contains("after: params[\"three\"]:undefined"); @@ -155,7 +155,7 @@ public class ScriptExampleTests { @Test public void testExtensionHistoStatsLogger() throws IOException { - ScenarioResult scenarioResult = runScenario("extension_histostatslogger"); + ExecMetricsResult scenarioResult = runScenario("extension_histostatslogger"); assertThat(scenarioResult.getIOLog()).contains("stdout started " + "logging to logs/histostats.csv"); List strings = Files.readAllLines(Paths.get( @@ -167,7 +167,7 @@ public class ScriptExampleTests { @Test public void testExtensionCsvOutput() throws IOException { - ScenarioResult scenarioResult = runScenario("extension_csvoutput"); + ExecMetricsResult scenarioResult = runScenario("extension_csvoutput"); List strings = Files.readAllLines(Paths.get( "logs/csvoutputtestfile.csv")); String logdata = strings.stream().collect(Collectors.joining("\n")); @@ -177,7 +177,7 @@ public class ScriptExampleTests { @Test public void testExtensionHistogramLogger() throws IOException { - ScenarioResult scenarioResult = runScenario("extension_histologger"); + ExecMetricsResult scenarioResult = runScenario("extension_histologger"); assertThat(scenarioResult.getIOLog()).contains("stdout started logging to hdrhistodata.log"); List strings = Files.readAllLines(Paths.get("hdrhistodata.log")); String logdata = strings.stream().collect(Collectors.joining("\n")); @@ -187,7 +187,7 @@ public class ScriptExampleTests { @Test public void testBlockingRun() { - ScenarioResult scenarioResult = runScenario("blockingrun"); + ExecMetricsResult scenarioResult = runScenario("blockingrun"); int a1end = scenarioResult.getIOLog().indexOf("blockingactivity1 finished"); int a2start = scenarioResult.getIOLog().indexOf("running blockingactivity2"); assertThat(a1end).isLessThan(a2start); @@ -195,12 +195,12 @@ public class ScriptExampleTests { @Test public void testAwaitFinished() { - ScenarioResult scenarioResult = runScenario("awaitfinished"); + ExecMetricsResult scenarioResult = runScenario("awaitfinished"); } @Test public void testStartStop() { - ScenarioResult scenarioResult = runScenario("startstopdiag"); + ExecMetricsResult scenarioResult = runScenario("startstopdiag"); int startedAt = scenarioResult.getIOLog().indexOf("starting activity teststartstopdiag"); int stoppedAt = scenarioResult.getIOLog().indexOf("stopped activity teststartstopdiag"); assertThat(startedAt).isGreaterThan(0); @@ -210,7 +210,7 @@ public class ScriptExampleTests { // TODO: find out why this causes a long delay after stop is called. @Test public void testThreadChange() { - ScenarioResult scenarioResult = runScenario("threadchange"); + ExecMetricsResult scenarioResult = runScenario("threadchange"); int changedTo1At = scenarioResult.getIOLog().indexOf("threads now 1"); int changedTo5At = scenarioResult.getIOLog().indexOf("threads now 5"); System.out.println("IOLOG:\n"+scenarioResult.getIOLog()); @@ -220,13 +220,13 @@ public class ScriptExampleTests { @Test public void testReadMetric() { - ScenarioResult scenarioResult = runScenario("readmetrics"); + ExecMetricsResult scenarioResult = runScenario("readmetrics"); assertThat(scenarioResult.getIOLog()).contains("count: "); } @Test public void testShutdownHook() { - ScenarioResult scenarioResult = runScenario("extension_shutdown_hook"); + ExecMetricsResult scenarioResult = runScenario("extension_shutdown_hook"); assertThat(scenarioResult.getIOLog()).doesNotContain("shutdown hook running").describedAs( "shutdown hooks should not run in the same IO context as the main scenario" ); @@ -234,7 +234,7 @@ public class ScriptExampleTests { @Test public void testExceptionPropagationFromActivityInit() { - ScenarioResult scenarioResult = runScenario("activityiniterror"); + ExecMetricsResult scenarioResult = runScenario("activityiniterror"); assertThat(scenarioResult.getException()).isPresent(); assertThat(scenarioResult.getException().get().getMessage()).contains("Unable to convert end cycle from invalid"); assertThat(scenarioResult.getException()).isNotNull(); @@ -242,7 +242,7 @@ public class ScriptExampleTests { @Test public void testReportedCoDelayBursty() { - ScenarioResult scenarioResult = runScenario("cocycledelay_bursty"); + ExecMetricsResult scenarioResult = runScenario("cocycledelay_bursty"); assertThat(scenarioResult.getIOLog()).contains("step1 metrics.waittime="); assertThat(scenarioResult.getIOLog()).contains("step2 metrics.waittime="); String iolog = scenarioResult.getIOLog(); @@ -252,7 +252,7 @@ public class ScriptExampleTests { @Test public void testReportedCoDelayStrict() { - ScenarioResult scenarioResult = runScenario("cocycledelay_strict"); + ExecMetricsResult scenarioResult = runScenario("cocycledelay_strict"); assertThat(scenarioResult.getIOLog()).contains("step1 cycles.waittime="); assertThat(scenarioResult.getIOLog()).contains("step2 cycles.waittime="); String iolog = scenarioResult.getIOLog(); @@ -263,14 +263,14 @@ public class ScriptExampleTests { @Test public void testCycleRateChangeNewMetrics() { - ScenarioResult scenarioResult = runScenario("cycle_rate_change"); + ExecMetricsResult scenarioResult = runScenario("cycle_rate_change"); String ioLog = scenarioResult.getIOLog(); assertThat(ioLog).contains("cycles adjusted, exiting on iteration"); } @Test public void testExitLogic() { - ScenarioResult scenarioResult = runScenario( + ExecMetricsResult scenarioResult = runScenario( "basicdiag", "type", "diag", "cyclerate", "5", "erroroncycle", "10", "cycles", "2000" ); diff --git a/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/SpeedCheckIntegrationTests.java b/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/SpeedCheckIntegrationTests.java index ee8fae02a..b750530d6 100644 --- a/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/SpeedCheckIntegrationTests.java +++ b/nbr-examples/src/test/java/io/nosqlbench/nbr/examples/SpeedCheckIntegrationTests.java @@ -15,7 +15,7 @@ */ package io.nosqlbench.nbr.examples; -import io.nosqlbench.engine.core.lifecycle.ScenarioResult; +import io.nosqlbench.engine.core.lifecycle.ExecMetricsResult; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -30,14 +30,14 @@ public class SpeedCheckIntegrationTests { @Disabled // Verified as working public void testSpeedSanity() { - ScenarioResult scenarioResult = ScriptExampleTests.runScenario("speedcheck"); + ExecMetricsResult scenarioResult = ScriptExampleTests.runScenario("speedcheck"); } @Test @Disabled // This seems incomplete public void testThreadSpeeds() { - ScenarioResult scenarioResult = ScriptExampleTests.runScenario("threadspeeds"); + ExecMetricsResult scenarioResult = ScriptExampleTests.runScenario("threadspeeds"); } diff --git a/nbr/src/test/java/io/nosqlbench/cli/testing/ExitStatusIntegrationTests.java b/nbr/src/test/java/io/nosqlbench/cli/testing/ExitStatusIntegrationTests.java index 70a163eaf..c1b580e24 100644 --- a/nbr/src/test/java/io/nosqlbench/cli/testing/ExitStatusIntegrationTests.java +++ b/nbr/src/test/java/io/nosqlbench/cli/testing/ExitStatusIntegrationTests.java @@ -86,18 +86,18 @@ class ExitStatusIntegrationTests { assertThat(result.exitStatus).isEqualTo(2); } -// This will not work reliablyl until the activity shutdown bug is fixed. -// @Test -// public void testCloseErrorHandlerOnSpace() { -// ProcessInvoker invoker = new ProcessInvoker(); -// invoker.setLogDir("logs/test"); -// ProcessResult result = invoker.run("exitstatus_erroronclose", 30, -// java, "-jar", JARNAME, "--logs-dir", "logs/test/error_on_close", "run", -// "driver=diag", "threads=2", "rate=5", "op=noop", "cycles=10", "erroronclose=true", "-vvv" -// ); -// String stdout = String.join("\n", result.getStdoutData()); -// String stderr = String.join("\n", result.getStderrData()); -// assertThat(result.exception).isNotNull(); -// assertThat(result.exception.getMessage()).contains("diag space was configured to throw"); -// } + @Test + public void testCloseErrorHandlerOnSpace() { + ProcessInvoker invoker = new ProcessInvoker(); + invoker.setLogDir("logs/test"); + ProcessResult result = invoker.run("exitstatus_erroronclose", 30, + java, "-jar", JARNAME, "--logs-dir", "logs/test/error_on_close", "run", + "driver=diag", "threads=2", "rate=5", "op=noop", "cycles=10", "erroronclose=true", "-vvv" + ); + String stdout = String.join("\n", result.getStdoutData()); + String stderr = String.join("\n", result.getStderrData()); + assertThat(result.exception).isNotNull(); + assertThat(result.exception.getMessage()).contains("diag space was configured to throw"); + } + }