From a57e96793010e50323007fa3177a5805978e72dc Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Wed, 27 Sep 2023 12:47:06 -0500 Subject: [PATCH] provide threads metric --- .../api/metrics/ActivityMetricsTest.java | 5 ++ .../lifecycle/activity/ActivityExecutor.java | 24 +++--- .../engine/core/ActivityExecutorTest.java | 83 ++++++++++--------- .../api/engine/metrics/ActivityMetrics.java | 7 +- 4 files changed, 65 insertions(+), 54 deletions(-) diff --git a/engine-api/src/test/java/io/nosqlbench/engine/api/metrics/ActivityMetricsTest.java b/engine-api/src/test/java/io/nosqlbench/engine/api/metrics/ActivityMetricsTest.java index abf5cc22c..fae9e7b3d 100644 --- a/engine-api/src/test/java/io/nosqlbench/engine/api/metrics/ActivityMetricsTest.java +++ b/engine-api/src/test/java/io/nosqlbench/engine/api/metrics/ActivityMetricsTest.java @@ -55,4 +55,9 @@ public class ActivityMetricsTest { } + @Test + public void testSanityCheckSanitize() { + String result = ActivityMetrics.sanitize("test-dynamic-params.input_cycles_first"); + assertThat(result).isEqualTo("test_dynamic_params__input_cycles_first"); + } } diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java index d494b1480..949d47f02 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java @@ -413,6 +413,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P // before threads start running such as metrics instruments activity.initActivity(); startMotorExecutorService(); + registerMetrics(); startRunningActivityThreads(); awaitMotorsAtLeastRunning(); logger.debug("STARTED " + activityDef.getAlias()); @@ -421,6 +422,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P this.exception = e; } finally { stoppedAt=System.currentTimeMillis(); + unregisterMetrics(); activity.shutdownActivity(); activity.closeAutoCloseables(); ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception); @@ -446,19 +448,18 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P } } - public synchronized void startActivity() { - RunStateImage startable = tally.awaitNoneOther(1000L, RunState.Uninitialized, RunState.Stopped); - if (startable.isTimeout()) { - throw new RuntimeException("Unable to start activity '" + getActivity().getAlias() + "' which is in state " + startable); - } - startMotorExecutorService(); - startRunningActivityThreads(); - awaitMotorsAtLeastRunning(); - registerMetrics(); - } +// public synchronized void startActivity() { +// RunStateImage startable = tally.awaitNoneOther(1000L, RunState.Uninitialized, RunState.Stopped); +// if (startable.isTimeout()) { +// throw new RuntimeException("Unable to start activity '" + getActivity().getAlias() + "' which is in state " + startable); +// } +// startMotorExecutorService(); +// startRunningActivityThreads(); +// awaitMotorsAtLeastRunning(); +// } private void registerMetrics() { - this.threadsGauge= threadsGauge = ActivityMetrics.register(new NBFunctionGauge(activity, () -> (double) this.motors.size(), "threads")); + this.threadsGauge = ActivityMetrics.register(new NBFunctionGauge(activity, () -> (double) this.motors.size(), "threads")); } private void unregisterMetrics() { ActivityMetrics.unregister(this.threadsGauge); @@ -485,7 +486,6 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P } finally { logger.trace(() -> "finally shutting down activity " + this.getActivity().getAlias()); this.stoppedAt = System.currentTimeMillis(); - unregisterMetrics(); activity.setRunState(RunState.Stopped); } diff --git a/engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java b/engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java index f00d25731..345ab3d40 100644 --- a/engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java +++ b/engine-core/src/test/java/io/nosqlbench/engine/core/ActivityExecutorTest.java @@ -16,8 +16,8 @@ package io.nosqlbench.engine.core; -import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.api.engine.activityimpl.ActivityDef; +import io.nosqlbench.api.labels.NBLabeledElement; import io.nosqlbench.engine.api.activityapi.core.*; import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.input.InputDispenser; @@ -37,11 +37,9 @@ import org.apache.logging.log4j.Logger; import org.junit.jupiter.api.Test; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; -import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.fail; class ActivityExecutorTest { @@ -85,41 +83,41 @@ class ActivityExecutorTest { // // } - @Test - synchronized void testDelayedStartSanity() { - - ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;"); - new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY); - - Activity activity = new DelayedInitActivity(activityDef); - final InputDispenser inputDispenser = new CoreInputDispenser(activity); - final ActionDispenser actionDispenser = new CoreActionDispenser(activity); - final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null); - - MotorDispenser motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser); - activity.setActionDispenserDelegate(actionDispenser); - activity.setOutputDispenserDelegate(outputDispenser); - activity.setInputDispenserDelegate(inputDispenser); - activity.setMotorDispenserDelegate(motorDispenser); - - ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start"); - - ExecutorService testExecutor = Executors.newCachedThreadPool(); - Future future = testExecutor.submit(activityExecutor); - - - try { - activityDef.setThreads(1); - activityExecutor.startActivity(); - future.get(); - testExecutor.shutdownNow(); - - } catch (final Exception e) { - fail("Unexpected exception", e); - } - - assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull(); - } +// @Test +// synchronized void testDelayedStartSanity() { +// +// ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;"); +// new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY); +// +// Activity activity = new DelayedInitActivity(activityDef); +// final InputDispenser inputDispenser = new CoreInputDispenser(activity); +// final ActionDispenser actionDispenser = new CoreActionDispenser(activity); +// final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null); +// +// MotorDispenser motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser); +// activity.setActionDispenserDelegate(actionDispenser); +// activity.setOutputDispenserDelegate(outputDispenser); +// activity.setInputDispenserDelegate(inputDispenser); +// activity.setMotorDispenserDelegate(motorDispenser); +// +// ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start"); +// +// ExecutorService testExecutor = Executors.newCachedThreadPool(); +// Future future = testExecutor.submit(activityExecutor); +// +// +// try { +// activityDef.setThreads(1); +// activityExecutor.startActivity(); +// future.get(); +// testExecutor.shutdownNow(); +// +// } catch (final Exception e) { +// fail("Unexpected exception", e); +// } +// +// assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull(); +// } @Test synchronized void testNewActivityExecutor() { @@ -144,7 +142,9 @@ class ActivityExecutorTest { ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor"); activityDef.setThreads(5); - activityExecutor.startActivity(); + ForkJoinTask executionResultForkJoinTask = ForkJoinPool.commonPool().submit(activityExecutor); + +// activityExecutor.startActivity(); final int[] speeds = {1, 50, 5, 50, 2, 50}; for (int offset = 0; offset < speeds.length; offset += 2) { @@ -160,6 +160,7 @@ class ActivityExecutorTest { fail("Not expecting exception", e); } } + executionResultForkJoinTask.cancel(true); // Used for slowing the roll due to state transitions in test. try { diff --git a/nb-api/src/main/java/io/nosqlbench/api/engine/metrics/ActivityMetrics.java b/nb-api/src/main/java/io/nosqlbench/api/engine/metrics/ActivityMetrics.java index e917af320..2744a4059 100644 --- a/nb-api/src/main/java/io/nosqlbench/api/engine/metrics/ActivityMetrics.java +++ b/nb-api/src/main/java/io/nosqlbench/api/engine/metrics/ActivityMetrics.java @@ -34,6 +34,7 @@ import java.util.DoubleSummaryStatistics; import java.util.List; import java.util.ServiceLoader; import java.util.concurrent.TimeUnit; +import java.util.prefs.BackingStoreException; import java.util.regex.Pattern; public class ActivityMetrics { @@ -116,6 +117,10 @@ public class ActivityMetrics { labels = labelValidator != null ? labelValidator.apply(labels) : labels; final String graphiteName = labels.linearizeValues('.', "[activity]", "[space]", "[op]", "name"); +// String sanitized = sanitize(graphiteName); +// if (!graphiteName.equals(sanitized)) { +// throw new RuntimeException("Attempted to register a metric which was not compatible with labeled metric forms. Submitted as '" + graphiteName + "', but should likely be '" + sanitized + "'"); +// } Metric metric = get().getMetrics().get(graphiteName); metric = get().getMetrics().get(graphiteName); @@ -420,7 +425,7 @@ public class ActivityMetrics { public static String sanitize(String word) { String sanitized = word; - sanitized = sanitized.replaceAll("\\..+$", ""); + sanitized = sanitized.replaceAll("\\.", "__"); sanitized = sanitized.replaceAll("-", "_"); sanitized = sanitized.replaceAll("[^a-zA-Z0-9_]+", "");