Merge branch 'main' into snyk-upgrade-fa24252251d9c2ec2647704da5f13b2e

This commit is contained in:
Jonathan Shook
2023-01-17 11:57:09 -06:00
committed by GitHub
6 changed files with 158 additions and 138 deletions

View File

@@ -31,6 +31,11 @@ jobs:
- name: Installing dependencies
run: mvn clean install -DskipTests=true -Dmaven.javadoc.skip=true -B -V
- name: Running tests
run: mvn -B test
- name: Collecting reports
run: tar -cvf codecov-report.tar target/coverage-report/**/*
- name: Uploading [nosqlbench] test coverage
uses: actions/upload-artifact@v3

View File

@@ -44,7 +44,7 @@
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy</artifactId>
<version>3.0.13</version>
<version>3.0.14</version>
</dependency>
<dependency>

View File

@@ -48,7 +48,6 @@ import java.util.stream.Collectors;
* <P>Some basic rules and invariants must be observed for consistent concurrent behavior.
* Any state changes for a Motor must be made through {@link Motor#getState()}.
* This allows the state tracking to work consistently for all observers.</p>
*
*/
public class ActivityExecutor implements ActivityController, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> {
@@ -91,20 +90,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
shutdownExecutorService(Integer.MAX_VALUE);
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
activity.setRunState(RunState.Stopped);
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
.session(sessionId)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
}
@@ -118,20 +117,20 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
motors.forEach(Motor::requestStop);
shutdownExecutorService(Integer.MAX_VALUE);
tally.awaitNoneOther(RunState.Stopped,RunState.Finished);
tally.awaitNoneOther(RunState.Stopped, RunState.Finished);
activity.setRunState(RunState.Stopped);
logger.info(() -> "stopped: " + this.getActivityDef().getAlias() + " with " + motors.size() + " slots");
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
.session(sessionId)
.interval(this.startedAt, this.stoppedAt)
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
}
@@ -211,10 +210,10 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
adjustMotorCountToThreadParam(activity.getActivityDef());
}
motors.stream()
.filter(m -> (m instanceof ActivityDefObserver))
.filter(m -> (m instanceof ActivityDefObserver))
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Uninitialized)
// .filter(m -> m.getSlotStateTracker().getSlotState() != RunState.Starting)
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
.forEach(m -> ((ActivityDefObserver) m).onActivityDefUpdate(activityDef));
}
}
@@ -228,8 +227,8 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private String getSlotStatus() {
return motors.stream()
.map(m -> m.getState().get().getCode())
.collect(Collectors.joining(",", "[", "]"));
.map(m -> m.getState().get().getCode())
.collect(Collectors.joining(",", "[", "]"));
}
/**
@@ -286,17 +285,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
case Running:
case Starting:
motors.stream()
.filter(m -> m.getState().get() != RunState.Running)
.filter(m -> m.getState().get() != RunState.Finished)
.filter(m -> m.getState().get() != RunState.Starting)
.forEach(m -> {
executorService.execute(m);
});
.filter(m -> m.getState().get() != RunState.Running)
.filter(m -> m.getState().get() != RunState.Finished)
.filter(m -> m.getState().get() != RunState.Starting)
.forEach(m -> {
executorService.execute(m);
});
break;
case Stopped:
motors.stream()
.filter(m -> m.getState().get() != RunState.Stopped)
.forEach(Motor::requestStop);
.filter(m -> m.getState().get() != RunState.Stopped)
.forEach(Motor::requestStop);
break;
case Finished:
case Stopping:
@@ -309,7 +308,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private void awaitAlignmentOfMotorStateToActivityState() {
logger.debug(()->"awaiting state alignment from " + activity.getRunState());
logger.debug(() -> "awaiting state alignment from " + activity.getRunState());
switch (activity.getRunState()) {
case Starting:
case Running:
@@ -412,7 +411,7 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
private void awaitMotorsAtLeastRunning() {
RunStateImage states = tally.awaitAny(RunState.Running, RunState.Stopped, RunState.Finished, RunState.Errored);
RunState maxState = states.getMaxState();
if (maxState==RunState.Errored) {
if (maxState == RunState.Errored) {
activity.setRunState(maxState);
throw new RuntimeException("Error in activity");
}
@@ -463,17 +462,17 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
RunStateImage state = tally.awaitNoneOther(RunState.Stopped, RunState.Finished, RunState.Errored);
RunState maxState = state.getMaxState();
activity.setRunState(maxState);
if (maxState==RunState.Errored) {
if (maxState == RunState.Errored) {
throw new RuntimeException("Error while waiting for activity completion:" + this.exception);
}
}
private void startMotorExecutorService() {
this.executorService = new ThreadPoolExecutor(
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<>(),
new IndexedThreadFactory(activity.getAlias(), new ActivityExceptionHandler(this))
);
}
@@ -490,14 +489,14 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
logger.info(() -> "starting activity " + activity.getAlias() + " for cycles " + activity.getCycleSummary());
Annotators.recordAnnotation(Annotation.newBuilder()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
.session(sessionId)
.now()
.layer(Layer.Activity)
.label("alias", getActivityDef().getAlias())
.label("driver", getActivityDef().getActivityType())
.label("workload", getActivityDef().getParams().getOptionalString("workload").orElse("none"))
.detail("params", getActivityDef().toString())
.build()
);
activitylogger.debug("START/before alias=(" + activity.getAlias() + ")");
@@ -508,11 +507,11 @@ public class ActivityExecutor implements ActivityController, ParameterMap.Listen
activity.onActivityDefUpdate(activityDef);
} catch (Exception e) {
this.exception = new RuntimeException("Error initializing activity '" + activity.getAlias() + "':\n" + e.getMessage(), e);
activitylogger.error(()->"error initializing activity '" + activity.getAlias() + "': " + exception);
activitylogger.error(() -> "error initializing activity '" + activity.getAlias() + "': " + exception);
throw new RuntimeException(exception);
}
adjustMotorCountToThreadParam(activity.getActivityDef());
tally.awaitAny(RunState.Running,RunState.Finished,RunState.Stopped);
tally.awaitAny(RunState.Running, RunState.Finished, RunState.Stopped);
activity.setRunState(RunState.Running);
activitylogger.debug("START/after alias=(" + activity.getAlias() + ")");
}

View File

@@ -35,136 +35,152 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
public class ActivityExecutorTest {
class ActivityExecutorTest {
private static final Logger logger = LogManager.getLogger(ActivityExecutorTest.class);
@Test
public synchronized void testRestart() {
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;op=initdelay:initdelay=5000;");
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
Activity a = new DelayedInitActivity(ad);
InputDispenser idisp = new CoreInputDispenser(a);
ActionDispenser adisp = new CoreActionDispenser(a);
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
a.setActionDispenserDelegate(adisp);
a.setOutputDispenserDelegate(tdisp);
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
ExecutorService executor = Executors.newCachedThreadPool();
ActivityExecutor ae = new ActivityExecutor(a, "test-restart");
Future<ExecutionResult> future = executor.submit(ae);
synchronized void testRestart() {
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;op=initdelay:initdelay=5000;");
new ActivityTypeLoader().load(activityDef);
final Activity activity = new DelayedInitActivity(activityDef);
InputDispenser inputDispenser = new CoreInputDispenser(activity);
ActionDispenser adisp = new CoreActionDispenser(activity);
OutputDispenser tdisp = CoreServices.getOutputDispenser(activity).orElse(null);
final MotorDispenser<?> mdisp = new CoreMotorDispenser(activity, inputDispenser, adisp, tdisp);
activity.setActionDispenserDelegate(adisp);
activity.setOutputDispenserDelegate(tdisp);
activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(mdisp);
final ExecutorService executor = Executors.newCachedThreadPool();
ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-restart");
final Future<ExecutionResult> future = executor.submit(activityExecutor);
try {
ad.setThreads(1);
ae.startActivity();
ae.stopActivity();
ae.startActivity();
ae.startActivity();
ExecutionResult executionResult = future.get();
activityDef.setThreads(1);
activityExecutor.startActivity();
activityExecutor.stopActivity();
activityExecutor.startActivity();
activityExecutor.startActivity();
future.get();
Thread.sleep(500L);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.print("ad.setThreads(1)");
executor.shutdown();
assertThat(idisp.getInput(10).getInputSegment(3)).isNull();
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
}
@Test
public synchronized void testDelayedStartSanity() {
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
Activity a = new DelayedInitActivity(ad);
InputDispenser idisp = new CoreInputDispenser(a);
ActionDispenser adisp = new CoreActionDispenser(a);
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
a.setActionDispenserDelegate(adisp);
a.setOutputDispenserDelegate(tdisp);
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
synchronized void testDelayedStartSanity() {
final ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef);
final Activity activity = new DelayedInitActivity(activityDef);
InputDispenser inputDispenser = new CoreInputDispenser(activity);
ActionDispenser actionDispenser = new CoreActionDispenser(activity);
OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
final MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
activity.setActionDispenserDelegate(actionDispenser);
activity.setOutputDispenserDelegate(outputDispenser);
activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(motorDispenser);
final ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
final ExecutorService testExecutor = Executors.newCachedThreadPool();
final Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
ActivityExecutor ae = new ActivityExecutor(a, "test-delayed-start");
ExecutorService testExecutor = Executors.newCachedThreadPool();
Future<ExecutionResult> future = testExecutor.submit(ae);
try {
ad.setThreads(1);
ae.startActivity();
ExecutionResult result = future.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
testExecutor.shutdownNow();
assertThat(idisp.getInput(10).getInputSegment(3)).isNull();
activityDef.setThreads(1);
activityExecutor.startActivity();
future.get();
testExecutor.shutdownNow();
} catch (Exception e) {
fail("Unexpected exception", e);
}
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
}
@Test
public synchronized void testNewActivityExecutor() {
ActivityDef ad = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;");
Optional<ActivityType> activityType = new ActivityTypeLoader().load(ad);
Input longSupplier = new AtomicInput(ad);
MotorDispenser<?> cmf = getActivityMotorFactory(
ad, motorActionDelay(999), longSupplier
);
Activity a = new SimpleActivity(ad);
InputDispenser idisp = new CoreInputDispenser(a);
ActionDispenser adisp = new CoreActionDispenser(a);
OutputDispenser tdisp = CoreServices.getOutputDispenser(a).orElse(null);
MotorDispenser<?> mdisp = new CoreMotorDispenser(a, idisp, adisp, tdisp);
a.setActionDispenserDelegate(adisp);
a.setInputDispenserDelegate(idisp);
a.setMotorDispenserDelegate(mdisp);
synchronized void testNewActivityExecutor() {
ActivityExecutor ae = new ActivityExecutor(a, "test-new-executor");
ad.setThreads(5);
ae.startActivity();
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test;cycles=1000;initdelay=5000;");
new ActivityTypeLoader().load(activityDef);
getActivityMotorFactory(motorActionDelay(999), new AtomicInput(activityDef));
final Activity simpleActivity = new SimpleActivity(activityDef);
InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
OutputDispenser outputDispenser = CoreServices.getOutputDispenser(simpleActivity).orElse(null);
final MotorDispenser<?> motorDispenser = new CoreMotorDispenser<>(simpleActivity,
inputDispenser, actionDispenser, outputDispenser);
simpleActivity.setActionDispenserDelegate(actionDispenser);
simpleActivity.setInputDispenserDelegate(inputDispenser);
simpleActivity.setMotorDispenserDelegate(motorDispenser);
final ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor");
activityDef.setThreads(5);
activityExecutor.startActivity();
int[] speeds = new int[]{1, 2000, 5, 2000, 2, 2000};
for (int offset = 0; offset < speeds.length; offset += 2) {
int threadTarget = speeds[offset];
int threadTime = speeds[offset + 1];
logger.info(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
ad.setThreads(threadTarget);
logger.debug(() -> "Setting thread level to " + threadTarget + " for " + threadTime + " seconds.");
activityDef.setThreads(threadTarget);
try {
Thread.sleep(threadTime);
} catch (InterruptedException ignored) {
} catch (Exception e) {
fail("Not expecting exception", e);
}
}
ad.setThreads(0);
// Used for slowing the roll due to state transitions in test.
try {
activityExecutor.stopActivity();
Thread.sleep(2000L);
} catch (Exception e) {
fail("Not expecting exception", e);
}
}
private MotorDispenser<?> getActivityMotorFactory(final ActivityDef ad, Action lc, final Input ls) {
MotorDispenser<?> cmf = new MotorDispenser<>() {
private MotorDispenser<?> getActivityMotorFactory(Action lc, final Input ls) {
return new MotorDispenser<>() {
@Override
public Motor getMotor(ActivityDef activityDef, int slotId) {
Activity activity = new SimpleActivity(activityDef);
Motor<?> cm = new CoreMotor(activity, slotId, ls);
Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
cm.setAction(lc);
return cm;
}
};
return cmf;
}
private SyncAction motorActionDelay(final long delay) {
SyncAction consumer = new SyncAction() {
return new SyncAction() {
@Override
public int runCycle(long cycle) {
System.out.println("consuming " + cycle + ", delaying:" + delay);
logger.info(() -> "consuming " + cycle + ", delaying:" + delay);
try {
Thread.sleep(delay);
} catch (InterruptedException ignored) {
@@ -172,7 +188,7 @@ public class ActivityExecutorTest {
return 0;
}
};
return consumer;
}
private static class DelayedInitActivity extends SimpleActivity {
@@ -184,13 +200,13 @@ public class ActivityExecutorTest {
@Override
public void initActivity() {
Integer initdelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
logger.info(() -> "delaying for " + initdelay);
Integer initDelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
logger.info(() -> "delaying for " + initDelay);
try {
Thread.sleep(initdelay);
Thread.sleep(initDelay);
} catch (InterruptedException ignored) {
}
logger.info(() -> "delayed for " + initdelay);
logger.info(() -> "delayed for " + initDelay);
}
}
}

View File

@@ -351,23 +351,23 @@
<dependency>
<groupId>org.graalvm.sdk</groupId>
<artifactId>graal-sdk</artifactId>
<version>22.2.0</version>
<version>22.3.0</version>
</dependency>
<dependency>
<groupId>org.graalvm.js</groupId>
<artifactId>js</artifactId>
<version>22.2.0</version>
<version>22.3.0</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.graalvm.js</groupId>
<artifactId>js-scriptengine</artifactId>
<version>22.2.0</version>
<version>22.3.0</version>
</dependency>
<dependency>
<groupId>org.graalvm.tools</groupId>
<artifactId>profiler</artifactId>
<version>22.2.0</version>
<version>22.3.0</version>
<scope>runtime</scope>
</dependency>
<dependency>

View File

@@ -75,7 +75,7 @@ public class ActivityDef implements NBNamedElement {
ActivityDef activityDef = new ActivityDef(activityParameterMap.orElseThrow(
() -> new RuntimeException("Unable to parse:" + namedActivitySpec)
));
logger.debug("parsed activityDef " + namedActivitySpec + " to-> " + activityDef);
logger.info("parsed activityDef " + namedActivitySpec + " to-> " + activityDef);
return activityDef;
}