diff --git a/mvn-defaults/pom.xml b/mvn-defaults/pom.xml index 4c6633edc..1704c21af 100644 --- a/mvn-defaults/pom.xml +++ b/mvn-defaults/pom.xml @@ -515,6 +515,7 @@ 21 21 21 + true @@ -727,7 +728,7 @@ org.apache.maven.plugins maven-compiler-plugin - 3.10.1 + 3.11.0 org.apache.maven.plugins diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/virtualized/ProveOutAsyncStructuredConcurrencyTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/virtualized/ProveOutAsyncStructuredConcurrencyTest.java new file mode 100644 index 000000000..6034389a9 --- /dev/null +++ b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/virtualized/ProveOutAsyncStructuredConcurrencyTest.java @@ -0,0 +1,183 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.engine.core.virtualized; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.StructuredTaskScope; +import java.util.regex.Pattern; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + *

Structured Asynchronous Execution

+ *

This example shows that it is possible to combine structured concurrency support with + * asynchronous tasks which run within the context of a StructuredTaskScope.

+ * + *

The normative usage of StructuredTaskScope is documented as an ordered pattern of + *

    + *
  1. initializing the scope
  2. + *
  3. (and then) forking subtasks
  4. + *
  5. (and then) joining subtasks
  6. + *
+ * Asynchronous task structure deviates from this because it is not known what the explicit subtask structure + * will be, or when subtasks will be forked into the context. In this scenario, the lifetime of the task scope is + * not strictly bounded to a set of predetermined, subtasks and thus must be closed by some other + * intentional side-effect. + *

+ * + *

This can be supported within the lifetime of the task scope as long as some pending task keeps the scope open + * until it is needed to be closed. For this, a lifeline task is used as a proxy for the lifetime of the + * task scope.

+ *

+ * The distinct error handling flows are tested independently and documented within the examples below. + */ +public class ProveOutAsyncStructuredConcurrencyTest { + + /** + *

Normal execution

+ *
    + *
  1. (exit on error) task scope init
  2. + *
  3. lifetime task starts
  4. + *
  5. multiple subtasks are submitted and run to completion with no exceptions
  6. + *
  7. lifetime task stops with no exception
  8. + *
  9. task scope completes with no exception
  10. + *
+ */ + @Test + public void testStructuredConcurrencyHappyPath() { + StructuredTaskScope.ShutdownOnFailure sts = new StructuredTaskScope.ShutdownOnFailure(); + sts.fork(() -> { + for (int i = 0; i < 10; i++) { + System.out.println("lifeline thread at " + i); +// if (i==9) { +// throw new RuntimeException("failure on 9"); +// } + + Thread.sleep(i + 1000); + } + return 10; + }); + for (int i = 0; i < 10; i++) { + int finalI = i; + + StructuredTaskScope.Subtask st = sts.fork(() -> { + Thread.sleep(finalI * 500); + System.out.println("i:" + finalI); +// if (finalI==9) { +// throw new RuntimeException("failure on 9"); +// } + return finalI; + } + ); + } + try { + sts.join(); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertThat(sts.exception()).isEmpty(); + } + + /** + *

Lifecycle Error

+ *
    + *
  1. (exit on error) task scope init
  2. + *
  3. lifetime task starts
  4. + *
  5. multiple subtasks are submitted and run to completion with no exceptions
  6. + *
  7. lifetime task stops WITH exception
  8. + *
  9. task scope completes with (lifetime task) exception
  10. + *
+ */ + @Test + public void testStructuredConcurrencyLifetimeTaskError() { + StructuredTaskScope.ShutdownOnFailure sts = new StructuredTaskScope.ShutdownOnFailure(); + sts.fork(() -> { + for (int i = 0; i < 10; i++) { + System.out.println("lifeline thread at " + i); + if (i == 9) { + throw new RuntimeException("lifetime failure on 9"); + } + + Thread.sleep(i + 1000); + } + return 10; + }); + for (int i = 0; i < 10; i++) { + int finalI = i; + + StructuredTaskScope.Subtask st = sts.fork(() -> { + Thread.sleep(finalI * 500); + System.out.println("i:" + finalI); + return finalI; + } + ); + } + try { + sts.join(); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertThat(sts.exception()).isPresent(); + assertThat(sts.exception().toString()).matches(Pattern.compile(".*lifetime failure.*")); + + } + + + /** + *

Subtask Error

+ *
    + *
  1. (exit on error) task scope init
  2. + *
  3. lifetime task starts
  4. + *
  5. at least one subtasks is submitted which throws an exception
  6. + *
  7. lifetime task stops WITH exception
  8. + *
  9. task scope completes with (subtask) exception
  10. + *
+ */ + @Test + public void testStructuredConcurrencySubtaskError() { + StructuredTaskScope.ShutdownOnFailure sts = new StructuredTaskScope.ShutdownOnFailure(); + sts.fork(() -> { + for (int i = 0; i < 10; i++) { + System.out.println("lifeline thread at " + i); + Thread.sleep(i + 1000); + } + return 10; + }); + for (int i = 0; i < 10; i++) { + int finalI = i; + + StructuredTaskScope.Subtask st = sts.fork(() -> { + Thread.sleep(finalI * 500); + System.out.println("i:" + finalI); + if (finalI == 9) { + throw new RuntimeException("subtask failure on 9"); + } + return finalI; + } + ); + } + try { + sts.join(); + } catch (Exception e) { + throw new RuntimeException(e); + } + assertThat(sts.exception()).isPresent(); + assertThat(sts.exception().toString()).matches(Pattern.compile(".*subtask failure.*")); + } + +}