From 674b26121f0c3addec07bf6783de29b3389df588 Mon Sep 17 00:00:00 2001
From: Jonathan Shook
Date: Mon, 20 May 2024 19:21:08 -0500
Subject: [PATCH] prove out async structured concurrency examples
update name
fix typos
fix more typos
---
mvn-defaults/pom.xml | 3 +-
...roveOutAsyncStructuredConcurrencyTest.java | 183 ++++++++++++++++++
2 files changed, 185 insertions(+), 1 deletion(-)
create mode 100644 nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/virtualized/ProveOutAsyncStructuredConcurrencyTest.java
diff --git a/mvn-defaults/pom.xml b/mvn-defaults/pom.xml
index 4c6633edc..1704c21af 100644
--- a/mvn-defaults/pom.xml
+++ b/mvn-defaults/pom.xml
@@ -515,6 +515,7 @@
212121
+ true
@@ -727,7 +728,7 @@
org.apache.maven.pluginsmaven-compiler-plugin
- 3.10.1
+ 3.11.0org.apache.maven.plugins
diff --git a/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/virtualized/ProveOutAsyncStructuredConcurrencyTest.java b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/virtualized/ProveOutAsyncStructuredConcurrencyTest.java
new file mode 100644
index 000000000..6034389a9
--- /dev/null
+++ b/nb-engine/nb-engine-core/src/test/java/io/nosqlbench/engine/core/virtualized/ProveOutAsyncStructuredConcurrencyTest.java
@@ -0,0 +1,183 @@
+/*
+ * Copyright (c) 2024 nosqlbench
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.nosqlbench.engine.core.virtualized;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.StructuredTaskScope;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ *
Structured Asynchronous Execution
+ *
This example shows that it is possible to combine structured concurrency support with
+ * asynchronous tasks which run within the context of a StructuredTaskScope.
+ *
+ *
The normative usage of StructuredTaskScope is documented as an ordered pattern of
+ *
+ *
initializing the scope
+ *
(and then) forking subtasks
+ *
(and then) joining subtasks
+ *
+ * Asynchronous task structure deviates from this because it is not known what the explicit subtask structure
+ * will be, or when subtasks will be forked into the context. In this scenario, the lifetime of the task scope is
+ * not strictly bounded to a set of predetermined, subtasks and thus must be closed by some other
+ * intentional side-effect.
+ *
+ *
+ *
This can be supported within the lifetime of the task scope as long as some pending task keeps the scope open
+ * until it is needed to be closed. For this, a lifeline task is used as a proxy for the lifetime of the
+ * task scope.
+ *
+ * The distinct error handling flows are tested independently and documented within the examples below.
+ */
+public class ProveOutAsyncStructuredConcurrencyTest {
+
+ /**
+ *
Normal execution
+ *
+ *
(exit on error) task scope init
+ *
lifetime task starts
+ *
multiple subtasks are submitted and run to completion with no exceptions
+ *
lifetime task stops with no exception
+ *
task scope completes with no exception
+ *
+ */
+ @Test
+ public void testStructuredConcurrencyHappyPath() {
+ StructuredTaskScope.ShutdownOnFailure sts = new StructuredTaskScope.ShutdownOnFailure();
+ sts.fork(() -> {
+ for (int i = 0; i < 10; i++) {
+ System.out.println("lifeline thread at " + i);
+// if (i==9) {
+// throw new RuntimeException("failure on 9");
+// }
+
+ Thread.sleep(i + 1000);
+ }
+ return 10;
+ });
+ for (int i = 0; i < 10; i++) {
+ int finalI = i;
+
+ StructuredTaskScope.Subtask st = sts.fork(() -> {
+ Thread.sleep(finalI * 500);
+ System.out.println("i:" + finalI);
+// if (finalI==9) {
+// throw new RuntimeException("failure on 9");
+// }
+ return finalI;
+ }
+ );
+ }
+ try {
+ sts.join();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ assertThat(sts.exception()).isEmpty();
+ }
+
+ /**
+ *
Lifecycle Error
+ *
+ *
(exit on error) task scope init
+ *
lifetime task starts
+ *
multiple subtasks are submitted and run to completion with no exceptions
+ *
lifetime task stops WITH exception
+ *
task scope completes with (lifetime task) exception
+ *
+ */
+ @Test
+ public void testStructuredConcurrencyLifetimeTaskError() {
+ StructuredTaskScope.ShutdownOnFailure sts = new StructuredTaskScope.ShutdownOnFailure();
+ sts.fork(() -> {
+ for (int i = 0; i < 10; i++) {
+ System.out.println("lifeline thread at " + i);
+ if (i == 9) {
+ throw new RuntimeException("lifetime failure on 9");
+ }
+
+ Thread.sleep(i + 1000);
+ }
+ return 10;
+ });
+ for (int i = 0; i < 10; i++) {
+ int finalI = i;
+
+ StructuredTaskScope.Subtask st = sts.fork(() -> {
+ Thread.sleep(finalI * 500);
+ System.out.println("i:" + finalI);
+ return finalI;
+ }
+ );
+ }
+ try {
+ sts.join();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ assertThat(sts.exception()).isPresent();
+ assertThat(sts.exception().toString()).matches(Pattern.compile(".*lifetime failure.*"));
+
+ }
+
+
+ /**
+ *
Subtask Error
+ *
+ *
(exit on error) task scope init
+ *
lifetime task starts
+ *
at least one subtasks is submitted which throws an exception
+ *
lifetime task stops WITH exception
+ *
task scope completes with (subtask) exception
+ *
+ */
+ @Test
+ public void testStructuredConcurrencySubtaskError() {
+ StructuredTaskScope.ShutdownOnFailure sts = new StructuredTaskScope.ShutdownOnFailure();
+ sts.fork(() -> {
+ for (int i = 0; i < 10; i++) {
+ System.out.println("lifeline thread at " + i);
+ Thread.sleep(i + 1000);
+ }
+ return 10;
+ });
+ for (int i = 0; i < 10; i++) {
+ int finalI = i;
+
+ StructuredTaskScope.Subtask st = sts.fork(() -> {
+ Thread.sleep(finalI * 500);
+ System.out.println("i:" + finalI);
+ if (finalI == 9) {
+ throw new RuntimeException("subtask failure on 9");
+ }
+ return finalI;
+ }
+ );
+ }
+ try {
+ sts.join();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ assertThat(sts.exception()).isPresent();
+ assertThat(sts.exception().toString()).matches(Pattern.compile(".*subtask failure.*"));
+ }
+
+}