Merge pull request #880 from nosqlbench/nosqlbench-797-callable

nosqlbench-797-callable
This commit is contained in:
Jonathan Shook 2022-12-22 18:02:45 -06:00 committed by GitHub
commit dd1fc35d35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 26 additions and 92 deletions

View File

@ -224,7 +224,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
try { try {
closeable.close(); closeable.close();
} catch (Exception e) { } catch (Exception e) {
throw new RuntimeException("Error closing " + closeable); throw new RuntimeException("Error closing " + closeable + ": " + e, e);
} }
} }
closeables.clear(); closeables.clear();

View File

@ -469,15 +469,6 @@ public class NBCLI implements Function<String[], Integer> {
scenario.addScenarioScriptParams(scriptParams); scenario.addScenarioScriptParams(scriptParams);
scenariosExecutor.execute(scenario); scenariosExecutor.execute(scenario);
// while (true) {
// Optional<ScenarioResult> pendingResult = executor.getPendingResult(scenario.getScenarioName());
// if (pendingResult.isPresent()) {
// break;
// }
// LockSupport.parkNanos(100000000L);
// }
ScenariosResults scenariosResults = scenariosExecutor.awaitAllResults(); ScenariosResults scenariosResults = scenariosExecutor.awaitAllResults();
logger.debug(() -> "Total of " + scenariosResults.getSize() + " result object returned from ScenariosExecutor"); logger.debug(() -> "Total of " + scenariosResults.getSize() + " result object returned from ScenariosExecutor");

View File

@ -54,21 +54,21 @@ public class ExecutionMetricsResult extends ExecutionResult {
public String getMetricsSummary() { public String getMetricsSummary() {
ByteArrayOutputStream os = new ByteArrayOutputStream(); ByteArrayOutputStream os = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(os); try (PrintStream ps = new PrintStream(os)) {
ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(ActivityMetrics.getMetricRegistry()) ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(ActivityMetrics.getMetricRegistry())
.convertDurationsTo(TimeUnit.MICROSECONDS) .convertDurationsTo(TimeUnit.MICROSECONDS)
.convertRatesTo(TimeUnit.SECONDS) .convertRatesTo(TimeUnit.SECONDS)
.filter(MetricFilter.ALL) .filter(MetricFilter.ALL)
.outputTo(ps); .outputTo(ps);
Set<MetricAttribute> disabled = new HashSet<>(INTERVAL_ONLY_METRICS); Set<MetricAttribute> disabled = new HashSet<>(INTERVAL_ONLY_METRICS);
if (this.getElapsedMillis()<60000) { if (this.getElapsedMillis()<60000) {
disabled.addAll(OVER_ONE_MINUTE_METRICS); disabled.addAll(OVER_ONE_MINUTE_METRICS);
}
builder.disabledMetricAttributes(disabled);
ConsoleReporter consoleReporter = builder.build();
consoleReporter.report();
consoleReporter.close();
} }
builder.disabledMetricAttributes(disabled);
ConsoleReporter consoleReporter = builder.build();
consoleReporter.report();
ps.flush();
consoleReporter.close();
String result = os.toString(StandardCharsets.UTF_8); String result = os.toString(StandardCharsets.UTF_8);
return result; return result;
} }

View File

@ -26,7 +26,7 @@ import org.apache.logging.log4j.Logger;
* *
*/ */
public class ExecutionResult { public class ExecutionResult {
protected final static Logger logger = LogManager.getLogger(ExecutionMetricsResult.class); protected final static Logger logger = LogManager.getLogger(ExecutionResult.class);
protected final long startedAt; protected final long startedAt;
protected final long endedAt; protected final long endedAt;
protected final Exception exception; protected final Exception exception;

View File

@ -64,14 +64,9 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private final RunStateTally tally; private final RunStateTally tally;
private ExecutorService executorService; private ExecutorService executorService;
private Exception exception; private Exception exception;
private final static int waitTime = 10000;
private String sessionId = ""; private String sessionId = "";
private long startedAt = 0L; private long startedAt = 0L;
private long stoppedAt = 0L; private long stoppedAt = 0L;
private String[] annotatedCommand;
// private RunState intendedState = RunState.Uninitialized;
public ActivityExecutor(Activity activity, String sessionId) { public ActivityExecutor(Activity activity, String sessionId) {
this.activity = activity; this.activity = activity;
@ -199,31 +194,6 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
return activityDef; return activityDef;
} }
/**
* This is the canonical way to wait for an activity to finish. It ties together
* any way that an activity can finish under one blocking call.
* This should be awaited asynchronously from the control layer in separate threads.
* <p>
* TODO: move activity finisher thread to this class and remove separate implementation
*/
private boolean awaitCompletion(int waitTime) {
logger.debug(() -> "awaiting completion of '" + this.getActivity().getAlias() + "'");
boolean finished = shutdownExecutorService(waitTime);
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
return finished;
}
public String toString() { public String toString() {
return getClass().getSimpleName() + "~" + activityDef.getAlias(); return getClass().getSimpleName() + "~" + activityDef.getAlias();
} }
@ -438,7 +408,6 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
wasStopped = executorService.awaitTermination(secondsToWait, TimeUnit.SECONDS); wasStopped = executorService.awaitTermination(secondsToWait, TimeUnit.SECONDS);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
logger.trace("interrupted while awaiting termination"); logger.trace("interrupted while awaiting termination");
wasStopped = false;
logger.warn("while waiting termination of shutdown " + activity.getAlias() + ", " + ie.getMessage()); logger.warn("while waiting termination of shutdown " + activity.getAlias() + ", " + ie.getMessage());
activitylogger.debug("REQUEST STOP/exception alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped); activitylogger.debug("REQUEST STOP/exception alias=(" + activity.getAlias() + ") wasstopped=" + wasStopped);
} catch (RuntimeException e) { } catch (RuntimeException e) {

View File

@ -32,13 +32,10 @@ import java.util.concurrent.TimeoutException;
public class ActivityRuntimeInfo implements ProgressCapable { public class ActivityRuntimeInfo implements ProgressCapable {
private final static Logger logger = LogManager.getLogger(ActivityRuntimeInfo.class); private final static Logger logger = LogManager.getLogger(ActivityRuntimeInfo.class);
private final Activity activity; private final Activity activity;
private final Future<ExecutionResult> future; private final Future<ExecutionResult> future;
private final ActivityExecutor executor; private final ActivityExecutor executor;
private ExecutionResult result;
public ActivityRuntimeInfo(Activity activity, Future<ExecutionResult> result, ActivityExecutor executor) { public ActivityRuntimeInfo(Activity activity, Future<ExecutionResult> result, ActivityExecutor executor) {
this.activity = activity; this.activity = activity;
@ -60,6 +57,7 @@ public class ActivityRuntimeInfo implements ProgressCapable {
* Wait until the execution is complete and return the result. * Wait until the execution is complete and return the result.
* @param timeoutMillis * @param timeoutMillis
* @return null, or an ExecutionResult if the execution completed * @return null, or an ExecutionResult if the execution completed
* @throws RuntimeException if there was an execution exception
*/ */
public ExecutionResult awaitResult(long timeoutMillis) { public ExecutionResult awaitResult(long timeoutMillis) {
ExecutionResult result = null; ExecutionResult result = null;
@ -70,7 +68,6 @@ public class ActivityRuntimeInfo implements ProgressCapable {
logger.warn("interrupted waiting for execution to complete"); logger.warn("interrupted waiting for execution to complete");
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
// return new ExecutionResult(activity.getStartedAtMillis(),System.currentTimeMillis(),"",e);
} }
return result; return result;
} }

View File

@ -1,20 +0,0 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.core.lifecycle.activity;
public class ActivityStatus {
}

View File

@ -26,14 +26,14 @@ import io.nosqlbench.api.metadata.SystemId;
import io.nosqlbench.engine.api.extensions.ScriptingPluginInfo; import io.nosqlbench.engine.api.extensions.ScriptingPluginInfo;
import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer; import io.nosqlbench.engine.api.scripting.ScriptEnvBuffer;
import io.nosqlbench.engine.core.annotation.Annotators; import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.activity.ActivityProgressIndicator;
import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult; import io.nosqlbench.engine.core.lifecycle.ExecutionMetricsResult;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotScenarioController; import io.nosqlbench.engine.core.lifecycle.activity.ActivityProgressIndicator;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.ActivityBindings;
import io.nosqlbench.engine.core.lifecycle.scenario.script.SandboxExtensionFinder; import io.nosqlbench.engine.core.lifecycle.scenario.script.SandboxExtensionFinder;
import io.nosqlbench.engine.core.lifecycle.scenario.script.ScenarioContext; import io.nosqlbench.engine.core.lifecycle.scenario.script.ScenarioContext;
import io.nosqlbench.engine.core.lifecycle.scenario.script.ScriptParams; import io.nosqlbench.engine.core.lifecycle.scenario.script.ScriptParams;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.ActivityBindings;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotMetricRegistryBindings; import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotMetricRegistryBindings;
import io.nosqlbench.engine.core.lifecycle.scenario.script.bindings.PolyglotScenarioController;
import io.nosqlbench.nb.annotations.Maturity; import io.nosqlbench.nb.annotations.Maturity;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -45,7 +45,6 @@ import org.graalvm.polyglot.PolyglotAccess;
import javax.script.Compilable; import javax.script.Compilable;
import javax.script.CompiledScript; import javax.script.CompiledScript;
import javax.script.ScriptEngine; import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import java.io.*; import java.io.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
@ -88,8 +87,6 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
Interrupted, Interrupted,
Finished Finished
} }
private static final ScriptEngineManager engineManager = new ScriptEngineManager();
private final List<String> scripts = new ArrayList<>(); private final List<String> scripts = new ArrayList<>();
private ScriptEngine scriptEngine; private ScriptEngine scriptEngine;
private ScenarioController scenarioController; private ScenarioController scenarioController;
@ -266,7 +263,7 @@ public class Scenario implements Callable<ExecutionMetricsResult> {
); );
logger.debug("Running control script for " + getScenarioName() + "."); logger.debug("Running control script for " + getScenarioName() + ".");
scenarioController = new ScenarioController(this,minMaturity); scenarioController = new ScenarioController(this);
try { try {
initializeScriptingEngine(scenarioController); initializeScriptingEngine(scenarioController);
executeScenarioScripts(); executeScenarioScripts();

View File

@ -26,7 +26,6 @@ import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.ExecutionResult; import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory; import io.nosqlbench.engine.core.lifecycle.IndexedThreadFactory;
import io.nosqlbench.engine.core.lifecycle.activity.*; import io.nosqlbench.engine.core.lifecycle.activity.*;
import io.nosqlbench.nb.annotations.Maturity;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@ -48,13 +47,11 @@ public class ScenarioController {
private final Map<String, ActivityRuntimeInfo> activityInfoMap = new ConcurrentHashMap<>(); private final Map<String, ActivityRuntimeInfo> activityInfoMap = new ConcurrentHashMap<>();
private final Scenario scenario; private final Scenario scenario;
private final Maturity minMaturity;
private final ExecutorService activitiesExecutor; private final ExecutorService activitiesExecutor;
public ScenarioController(Scenario scenario, Maturity minMaturity) { public ScenarioController(Scenario scenario) {
this.scenario = scenario; this.scenario = scenario;
this.minMaturity = minMaturity;
this.activityLoader = new ActivityLoader(scenario); this.activityLoader = new ActivityLoader(scenario);
ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this); ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this);
@ -419,16 +416,19 @@ public class ScenarioController {
} }
public void shutdown() { public void shutdown() {
logger.debug(() -> "Requesting ScenarioController shutdown.");
this.activitiesExecutor.shutdown(); this.activitiesExecutor.shutdown();
try { try {
if (!this.activitiesExecutor.awaitTermination(5, TimeUnit.SECONDS)) { if (!this.activitiesExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
logger.info(() -> "Scenario is being forced to shutdown after waiting 5 seconds for graceful shutdown.");
this.activitiesExecutor.shutdownNow(); this.activitiesExecutor.shutdownNow();
if (!this.activitiesExecutor.awaitTermination(5, TimeUnit.SECONDS)) { if (!this.activitiesExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
throw new RuntimeException("Unable to shutdown activities executor"); throw new RuntimeException("Unable to shutdown activities executor");
} }
} }
} catch (Exception e) { } catch (Exception e) {
logger.warn("There was an exception while trying to shutdown the ScenarioController:" + e,e);
throw new RuntimeException(e);
} }
} }
} }