mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
additional diagnostics for spurious test
This commit is contained in:
@@ -286,10 +286,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
|
||||
* any way that an activity can finish under one blocking call.
|
||||
* This should be awaited asynchronously from the control layer in separate threads.
|
||||
* <p>
|
||||
* TODO: move activity finisher threaad to this class and remove separate implementation
|
||||
* TODO: move activity finisher thread to this class and remove separate implementation
|
||||
*/
|
||||
public boolean awaitCompletion(int waitTime) {
|
||||
|
||||
logger.debug(()-> "awaiting completion of '" + this.getActivity().getAlias() + "'");
|
||||
boolean finished = finishAndShutdownExecutor(waitTime);
|
||||
|
||||
Annotators.recordAnnotation(Annotation.newBuilder()
|
||||
|
||||
@@ -16,7 +16,11 @@
|
||||
|
||||
package io.nosqlbench.engine.core.lifecycle;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class ActivityFinisher extends Thread {
|
||||
private final static Logger logger = LogManager.getLogger(ActivityFinisher.class);
|
||||
|
||||
private final ActivityExecutor executor;
|
||||
private final int timeout;
|
||||
@@ -30,10 +34,17 @@ public class ActivityFinisher extends Thread {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
logger.debug(this + " awaiting async completion of " + executor.getActivity().getAlias() + " on " + executor + " for timeout " + timeout);
|
||||
result = executor.awaitCompletion(timeout);
|
||||
logger.debug(this + " awaited async completion of " + executor.getActivity().getAlias());
|
||||
}
|
||||
|
||||
public boolean getResult() {
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.getClass().getSimpleName()+"/" + executor.getActivity().getAlias();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -431,6 +431,7 @@ public class ScenarioController {
|
||||
* @return true, if all activities completed before the timer expired, false otherwise
|
||||
*/
|
||||
public boolean awaitCompletion(long waitTimeMillis) {
|
||||
logger.debug(() -> "awaiting completion");
|
||||
boolean completed = true;
|
||||
long remaining = waitTimeMillis;
|
||||
|
||||
@@ -443,7 +444,9 @@ public class ScenarioController {
|
||||
|
||||
for (ActivityFinisher finisher : finishers) {
|
||||
try {
|
||||
logger.debug("joining finisher " + finisher.getName());
|
||||
finisher.join(waitTimeMillis);
|
||||
logger.debug("joined finisher " + finisher.getName());
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
}
|
||||
|
||||
@@ -58,12 +58,28 @@ public class ScenarioResult {
|
||||
private final String iolog;
|
||||
|
||||
public ScenarioResult(String iolog, long startedAt, long endedAt) {
|
||||
logger.debug("populating result from iolog");
|
||||
if (logger.isDebugEnabled()) {
|
||||
StackTraceElement[] st = Thread.currentThread().getStackTrace();
|
||||
for (int i = 0; i < st.length; i++) {
|
||||
logger.debug(":AT " + st[i].getFileName()+":"+st[i].getLineNumber()+":"+st[i].getMethodName());
|
||||
if (i>10) break;
|
||||
}
|
||||
}
|
||||
this.iolog = iolog;
|
||||
this.startedAt = startedAt;
|
||||
this.endedAt = endedAt;
|
||||
}
|
||||
|
||||
public ScenarioResult(Exception e, long startedAt, long endedAt) {
|
||||
logger.debug("populating result from exception");
|
||||
if (logger.isDebugEnabled()) {
|
||||
StackTraceElement[] st = Thread.currentThread().getStackTrace();
|
||||
for (int i = 0; i < st.length; i++) {
|
||||
logger.debug(":AT " + st[i].getFileName()+":"+st[i].getLineNumber()+":"+st[i].getMethodName());
|
||||
if (i>10) break;
|
||||
}
|
||||
}
|
||||
this.iolog = e.getMessage();
|
||||
this.startedAt = startedAt;
|
||||
this.endedAt = endedAt;
|
||||
|
||||
@@ -27,7 +27,6 @@ import java.util.Map;
|
||||
public class ScenariosResults {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(ScenariosResults.class);
|
||||
|
||||
private final String scenariosExecutorName;
|
||||
private final Map<Scenario, ScenarioResult> scenarioResultMap = new LinkedHashMap<>();
|
||||
|
||||
@@ -77,4 +76,8 @@ public class ScenariosResults {
|
||||
return this.scenarioResultMap.values().stream()
|
||||
.anyMatch(r -> r.getException().isPresent());
|
||||
}
|
||||
|
||||
public int getSize() {
|
||||
return this.scenarioResultMap.size();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,9 +40,9 @@ public class ScenariosExecutor {
|
||||
|
||||
public ScenariosExecutor(String name, int threads) {
|
||||
executor = new ThreadPoolExecutor(1, threads,
|
||||
0L, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<>(),
|
||||
new IndexedThreadFactory("scenarios", new ScenarioExceptionHandler(this)));
|
||||
0L, TimeUnit.MILLISECONDS,
|
||||
new LinkedBlockingQueue<>(),
|
||||
new IndexedThreadFactory("scenarios", new ScenarioExceptionHandler(this)));
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@@ -89,7 +89,6 @@ public class ScenariosExecutor {
|
||||
long waitedAt = System.currentTimeMillis();
|
||||
long updateAt = Math.min(timeoutAt, waitedAt + updateInterval);
|
||||
while (!isShutdown && System.currentTimeMillis() < timeoutAt) {
|
||||
|
||||
while (!isShutdown && System.currentTimeMillis() < updateAt) {
|
||||
try {
|
||||
long timeRemaining = updateAt - System.currentTimeMillis();
|
||||
@@ -105,11 +104,17 @@ public class ScenariosExecutor {
|
||||
|
||||
if (!isShutdown) {
|
||||
throw new RuntimeException("executor still runningScenarios after awaiting all results for " + timeout
|
||||
+ "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown());
|
||||
+ "ms. isTerminated:" + executor.isTerminated() + " isShutdown:" + executor.isShutdown());
|
||||
}
|
||||
Map<Scenario, ScenarioResult> scenarioResultMap = new LinkedHashMap<>();
|
||||
getAsyncResultStatus()
|
||||
.entrySet().forEach(es -> scenarioResultMap.put(es.getKey(), es.getValue().orElse(null)));
|
||||
.entrySet()
|
||||
.forEach(
|
||||
es -> scenarioResultMap.put(
|
||||
es.getKey(),
|
||||
es.getValue().orElse(null)
|
||||
)
|
||||
);
|
||||
return new ScenariosResults(this, scenarioResultMap);
|
||||
}
|
||||
|
||||
@@ -118,9 +123,9 @@ public class ScenariosExecutor {
|
||||
*/
|
||||
public List<String> getPendingScenarios() {
|
||||
return new ArrayList<>(
|
||||
submitted.values().stream()
|
||||
.map(SubmittedScenario::getName)
|
||||
.collect(Collectors.toCollection(ArrayList::new)));
|
||||
submitted.values().stream()
|
||||
.map(SubmittedScenario::getName)
|
||||
.collect(Collectors.toCollection(ArrayList::new)));
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -146,6 +151,7 @@ public class ScenariosExecutor {
|
||||
oResult = Optional.of(resultFuture.get());
|
||||
} catch (Exception e) {
|
||||
long now = System.currentTimeMillis();
|
||||
logger.debug("creating exceptional scenario result from getAsyncResultStatus");
|
||||
oResult = Optional.of(new ScenarioResult(e, now, now));
|
||||
}
|
||||
}
|
||||
@@ -184,6 +190,7 @@ public class ScenariosExecutor {
|
||||
try {
|
||||
return Optional.ofNullable(resultFuture1.get());
|
||||
} catch (Exception e) {
|
||||
logger.debug("creating exceptional scenario result from getPendingResult");
|
||||
return Optional.of(new ScenarioResult(e, now, now));
|
||||
}
|
||||
} else if (resultFuture1.isCancelled()) {
|
||||
|
||||
Reference in New Issue
Block a user