provide threads metric

This commit is contained in:
Jonathan Shook 2023-09-27 12:47:06 -05:00
parent 96269785e2
commit a57e967930
4 changed files with 65 additions and 54 deletions

View File

@ -55,4 +55,9 @@ public class ActivityMetricsTest {
} }
@Test
public void testSanityCheckSanitize() {
String result = ActivityMetrics.sanitize("test-dynamic-params.input_cycles_first");
assertThat(result).isEqualTo("test_dynamic_params__input_cycles_first");
}
} }

View File

@ -413,6 +413,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
// before threads start running such as metrics instruments // before threads start running such as metrics instruments
activity.initActivity(); activity.initActivity();
startMotorExecutorService(); startMotorExecutorService();
registerMetrics();
startRunningActivityThreads(); startRunningActivityThreads();
awaitMotorsAtLeastRunning(); awaitMotorsAtLeastRunning();
logger.debug("STARTED " + activityDef.getAlias()); logger.debug("STARTED " + activityDef.getAlias());
@ -421,6 +422,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
this.exception = e; this.exception = e;
} finally { } finally {
stoppedAt=System.currentTimeMillis(); stoppedAt=System.currentTimeMillis();
unregisterMetrics();
activity.shutdownActivity(); activity.shutdownActivity();
activity.closeAutoCloseables(); activity.closeAutoCloseables();
ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception); ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception);
@ -446,19 +448,18 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
} }
} }
public synchronized void startActivity() { // public synchronized void startActivity() {
RunStateImage startable = tally.awaitNoneOther(1000L, RunState.Uninitialized, RunState.Stopped); // RunStateImage startable = tally.awaitNoneOther(1000L, RunState.Uninitialized, RunState.Stopped);
if (startable.isTimeout()) { // if (startable.isTimeout()) {
throw new RuntimeException("Unable to start activity '" + getActivity().getAlias() + "' which is in state " + startable); // throw new RuntimeException("Unable to start activity '" + getActivity().getAlias() + "' which is in state " + startable);
} // }
startMotorExecutorService(); // startMotorExecutorService();
startRunningActivityThreads(); // startRunningActivityThreads();
awaitMotorsAtLeastRunning(); // awaitMotorsAtLeastRunning();
registerMetrics(); // }
}
private void registerMetrics() { private void registerMetrics() {
this.threadsGauge= threadsGauge = ActivityMetrics.register(new NBFunctionGauge(activity, () -> (double) this.motors.size(), "threads")); this.threadsGauge = ActivityMetrics.register(new NBFunctionGauge(activity, () -> (double) this.motors.size(), "threads"));
} }
private void unregisterMetrics() { private void unregisterMetrics() {
ActivityMetrics.unregister(this.threadsGauge); ActivityMetrics.unregister(this.threadsGauge);
@ -485,7 +486,6 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
} finally { } finally {
logger.trace(() -> "finally shutting down activity " + this.getActivity().getAlias()); logger.trace(() -> "finally shutting down activity " + this.getActivity().getAlias());
this.stoppedAt = System.currentTimeMillis(); this.stoppedAt = System.currentTimeMillis();
unregisterMetrics();
activity.setRunState(RunState.Stopped); activity.setRunState(RunState.Stopped);
} }

View File

@ -16,8 +16,8 @@
package io.nosqlbench.engine.core; package io.nosqlbench.engine.core;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.core.*; import io.nosqlbench.engine.api.activityapi.core.*;
import io.nosqlbench.engine.api.activityapi.input.Input; import io.nosqlbench.engine.api.activityapi.input.Input;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
@ -37,11 +37,9 @@ import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Executors; import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Future;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail; import static org.assertj.core.api.Assertions.fail;
class ActivityExecutorTest { class ActivityExecutorTest {
@ -85,41 +83,41 @@ class ActivityExecutorTest {
// //
// } // }
@Test // @Test
synchronized void testDelayedStartSanity() { // synchronized void testDelayedStartSanity() {
//
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;"); // ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY); // new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY);
//
Activity activity = new DelayedInitActivity(activityDef); // Activity activity = new DelayedInitActivity(activityDef);
final InputDispenser inputDispenser = new CoreInputDispenser(activity); // final InputDispenser inputDispenser = new CoreInputDispenser(activity);
final ActionDispenser actionDispenser = new CoreActionDispenser(activity); // final ActionDispenser actionDispenser = new CoreActionDispenser(activity);
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null); // final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
//
MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser); // MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
activity.setActionDispenserDelegate(actionDispenser); // activity.setActionDispenserDelegate(actionDispenser);
activity.setOutputDispenserDelegate(outputDispenser); // activity.setOutputDispenserDelegate(outputDispenser);
activity.setInputDispenserDelegate(inputDispenser); // activity.setInputDispenserDelegate(inputDispenser);
activity.setMotorDispenserDelegate(motorDispenser); // activity.setMotorDispenserDelegate(motorDispenser);
//
ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start"); // ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
//
ExecutorService testExecutor = Executors.newCachedThreadPool(); // ExecutorService testExecutor = Executors.newCachedThreadPool();
Future<ExecutionResult> future = testExecutor.submit(activityExecutor); // Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
//
//
try { // try {
activityDef.setThreads(1); // activityDef.setThreads(1);
activityExecutor.startActivity(); // activityExecutor.startActivity();
future.get(); // future.get();
testExecutor.shutdownNow(); // testExecutor.shutdownNow();
//
} catch (final Exception e) { // } catch (final Exception e) {
fail("Unexpected exception", e); // fail("Unexpected exception", e);
} // }
//
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull(); // assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
} // }
@Test @Test
synchronized void testNewActivityExecutor() { synchronized void testNewActivityExecutor() {
@ -144,7 +142,9 @@ class ActivityExecutorTest {
ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor"); ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor");
activityDef.setThreads(5); activityDef.setThreads(5);
activityExecutor.startActivity(); ForkJoinTask<ExecutionResult> executionResultForkJoinTask = ForkJoinPool.commonPool().submit(activityExecutor);
// activityExecutor.startActivity();
final int[] speeds = {1, 50, 5, 50, 2, 50}; final int[] speeds = {1, 50, 5, 50, 2, 50};
for (int offset = 0; offset < speeds.length; offset += 2) { for (int offset = 0; offset < speeds.length; offset += 2) {
@ -160,6 +160,7 @@ class ActivityExecutorTest {
fail("Not expecting exception", e); fail("Not expecting exception", e);
} }
} }
executionResultForkJoinTask.cancel(true);
// Used for slowing the roll due to state transitions in test. // Used for slowing the roll due to state transitions in test.
try { try {

View File

@ -34,6 +34,7 @@ import java.util.DoubleSummaryStatistics;
import java.util.List; import java.util.List;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.prefs.BackingStoreException;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class ActivityMetrics { public class ActivityMetrics {
@ -116,6 +117,10 @@ public class ActivityMetrics {
labels = labelValidator != null ? labelValidator.apply(labels) : labels; labels = labelValidator != null ? labelValidator.apply(labels) : labels;
final String graphiteName = labels.linearizeValues('.', "[activity]", "[space]", "[op]", "name"); final String graphiteName = labels.linearizeValues('.', "[activity]", "[space]", "[op]", "name");
// String sanitized = sanitize(graphiteName);
// if (!graphiteName.equals(sanitized)) {
// throw new RuntimeException("Attempted to register a metric which was not compatible with labeled metric forms. Submitted as '" + graphiteName + "', but should likely be '" + sanitized + "'");
// }
Metric metric = get().getMetrics().get(graphiteName); Metric metric = get().getMetrics().get(graphiteName);
metric = get().getMetrics().get(graphiteName); metric = get().getMetrics().get(graphiteName);
@ -420,7 +425,7 @@ public class ActivityMetrics {
public static String sanitize(String word) { public static String sanitize(String word) {
String sanitized = word; String sanitized = word;
sanitized = sanitized.replaceAll("\\..+$", ""); sanitized = sanitized.replaceAll("\\.", "__");
sanitized = sanitized.replaceAll("-", "_"); sanitized = sanitized.replaceAll("-", "_");
sanitized = sanitized.replaceAll("[^a-zA-Z0-9_]+", ""); sanitized = sanitized.replaceAll("[^a-zA-Z0-9_]+", "");