stdout activity should not close stdout fd

This commit is contained in:
Jonathan Shook 2020-08-07 14:48:23 -05:00
parent 45e77f8db8
commit 637fbba985
2 changed files with 41 additions and 41 deletions

View File

@ -17,18 +17,16 @@
package io.nosqlbench.activitytype.stdout;
import io.nosqlbench.engine.api.activityconfig.ParsedStmt;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtDef;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDoc;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Timer;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityconfig.ParsedStmt;
import io.nosqlbench.engine.api.activityconfig.StatementsLoader;
import io.nosqlbench.engine.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.engine.api.activityconfig.yaml.StmtsDocList;
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
@ -47,7 +45,6 @@ import java.io.Writer;
import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@SuppressWarnings("Duplicates")
public class StdoutActivity extends SimpleActivity implements ActivityDefObserver {
@ -84,7 +81,10 @@ public class StdoutActivity extends SimpleActivity implements ActivityDefObserve
public void shutdownActivity() {
try {
if (pw != null) {
pw.close();
if (!fileName.toLowerCase().equals("stdout")) {
logger.trace("Closing non-stdout output stream.");
pw.close();
}
}
} catch (Exception e) {
logger.warn("error closing writer:" + e, e);

View File

@ -65,10 +65,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
this.activity = activity;
this.activityDef = activity.getActivityDef();
executorService = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
);
activity.getActivityDef().getParams().addListener(this);
activity.setActivityController(this);
@ -225,10 +225,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
adjustToActivityDef(activity.getActivityDef());
}
motors.stream()
.filter(m -> (m instanceof ActivityDefObserver))
.filter(m -> (m instanceof ActivityDefObserver))
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
}
}
@ -251,7 +251,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
activitylogger.debug("AWAIT-FINISH/exception alias=(" + activity.getAlias() + ")");
throw stoppingException;
}
activitylogger.debug("AWAIT-FINISH/afte alias=(" + activity.getAlias() + ")");
activitylogger.debug("AWAIT-FINISH/after alias=(" + activity.getAlias() + ")");
return awaited;
}
@ -261,8 +261,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private String getSlotStatus() {
return motors.stream()
.map(m -> m.getSlotStateTracker().getSlotState().getCode())
.collect(Collectors.joining(",", "[", "]"));
.map(m -> m.getSlotStateTracker().getSlotState().getCode())
.collect(Collectors.joining(",", "[", "]"));
}
/**
@ -305,18 +305,18 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
case Running:
case Starting:
motors.stream()
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> {
m.getSlotStateTracker().enterState(RunState.Starting);
executorService.execute(m);
});
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Running)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Finished)
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> {
m.getSlotStateTracker().enterState(RunState.Starting);
executorService.execute(m);
});
break;
case Stopped:
motors.stream()
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped)
.forEach(Motor::requestStop);
.filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Stopped)
.forEach(Motor::requestStop);
break;
case Finished:
case Stopping:
@ -354,9 +354,9 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
/**
* Await a thread (aka motor/slot) entering a specific SlotState
*
* @param m motor instance
* @param waitTime milliseconds to wait, total
* @param pollTime polling interval between state checks
* @param m motor instance
* @param waitTime milliseconds to wait, total
* @param pollTime polling interval between state checks
* @param desiredRunStates any desired SlotState
* @return true, if the desired SlotState was detected
*/
@ -365,7 +365,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
long startedAt = System.currentTimeMillis();
while (System.currentTimeMillis() < (startedAt + waitTime)) {
Map<RunState,Integer> actualStates = new HashMap<>();
Map<RunState, Integer> actualStates = new HashMap<>();
for (RunState state : desiredRunStates) {
actualStates.compute(state, (k, v) -> (v == null ? 0 : v) + 1);
}
@ -373,7 +373,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
actualStates.remove(desiredRunState);
}
logger.trace("state of remaining slots:" + actualStates.toString());
if (actualStates.size()==0) {
if (actualStates.size() == 0) {
return true;
} else {
System.out.println("motor states:" + actualStates.toString());
@ -397,7 +397,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
awaited = awaitMotorState(motor, waitTime, pollTime, awaitingState);
if (!awaited) {
logger.trace("failed awaiting motor " + motor.getSlotId() + " for state in " +
Arrays.asList(awaitingState));
Arrays.asList(awaitingState));
break;
}
}
@ -441,8 +441,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
boolean awaitedRequiredState = awaitMotorState(m, waitTime, pollTime, awaitingState);
if (!awaitedRequiredState) {
String error = "Unable to await " + activityDef.getAlias() +
"/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState()
+ " after waiting for " + waitTime + "ms";
"/Motor[" + m.getSlotId() + "]: from state " + startingState + " to " + m.getSlotStateTracker().getSlotState()
+ " after waiting for " + waitTime + "ms";
RuntimeException e = new RuntimeException(error);
logger.error(error);
throw e;
@ -468,9 +468,9 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
@Override
public synchronized double getProgress() {
ArrayList<Input> inputs = motors.stream()
.map(Motor::getInput)
.distinct()
.collect(Collectors.toCollection(ArrayList::new));
.map(Motor::getInput)
.distinct()
.collect(Collectors.toCollection(ArrayList::new));
double startCycle = getActivityDef().getStartCycle();
double endCycle = getActivityDef().getEndCycle();
@ -497,8 +497,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
@Override
public String getProgressDetails() {
return motors.stream().map(Motor::getInput).distinct().findFirst()
.filter(i -> i instanceof ProgressCapable)
.map(i -> ((ProgressCapable) i).getProgressDetails()).orElse("");
.filter(i -> i instanceof ProgressCapable)
.map(i -> ((ProgressCapable) i).getProgressDetails()).orElse("");
}
@ -510,8 +510,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
@Override
public RunState getProgressState() {
Optional<RunState> first = motors.stream()
.map(Motor::getSlotStateTracker).map(SlotStateTracker::getSlotState)
.distinct().sorted().findFirst();
.map(Motor::getSlotStateTracker).map(SlotStateTracker::getSlotState)
.distinct().sorted().findFirst();
return first.orElse(RunState.Uninitialized);
}