mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge pull request #1565 from nosqlbench/nosqlbench-1536-morelabels
Nosqlbench 1536 morelabels
This commit is contained in:
commit
0e93dd7917
@ -92,8 +92,8 @@
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.bouncycastle</groupId>
|
||||
<artifactId>bcprov-jdk15on</artifactId>
|
||||
<version>1.70</version>
|
||||
<artifactId>bcprov-jdk18on</artifactId>
|
||||
<version>1.75</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>com.fasterxml.jackson.core</groupId>
|
||||
|
@ -57,6 +57,7 @@ public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T>
|
||||
private final String opName;
|
||||
protected final DriverAdapter<T, S> adapter;
|
||||
private final NBLabels labels;
|
||||
public final Timer verifierTimer;
|
||||
private boolean instrument;
|
||||
private Timer successTimer;
|
||||
private Timer errorTimer;
|
||||
@ -97,7 +98,8 @@ public abstract class BaseOpDispenser<T extends Op, S> implements OpDispenser<T>
|
||||
List<CycleFunction<Boolean>> verifiers = new ArrayList<>();
|
||||
verifiers = configureVerifiers(op);
|
||||
this._verifier = CycleFunctions.of((a, b) -> a && b, verifiers, true);
|
||||
this.tlVerifier = ThreadLocal.withInitial(() -> _verifier.newInstance());
|
||||
this.tlVerifier = ThreadLocal.withInitial(_verifier::newInstance);
|
||||
this.verifierTimer = ActivityMetrics.timer(this,"verifier",3);
|
||||
}
|
||||
|
||||
private CycleFunction<Boolean> cloneVerifiers() {
|
||||
|
@ -133,4 +133,6 @@ public interface ActivityInstrumentation {
|
||||
* @return a new or existing {@link Histogram}
|
||||
*/
|
||||
Histogram getOrCreateTriesHistogram();
|
||||
|
||||
Timer getOrCreateVerifierTimer();
|
||||
}
|
||||
|
@ -113,4 +113,10 @@ public class CoreActivityInstrumentation implements ActivityInstrumentation {
|
||||
public synchronized Histogram getOrCreateTriesHistogram() {
|
||||
return ActivityMetrics.histogram(this.activity,"tries", this.activity.getHdrDigits());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Timer getOrCreateVerifierTimer() {
|
||||
return ActivityMetrics.timer(this.activity,"verifier", this.activity.getHdrDigits());
|
||||
|
||||
}
|
||||
}
|
||||
|
@ -63,14 +63,15 @@ public class AtomicInput implements Input, ActivityDefObserver, Gauge<Long>, NBL
|
||||
this.parent = parent;
|
||||
this.activityDef = activityDef;
|
||||
onActivityDefUpdate(activityDef);
|
||||
ActivityMetrics.gauge(this, "input_cycles_first", new NBFunctionGauge(this, () -> (double)this.cycles_min.get()));
|
||||
ActivityMetrics.gauge(this, "input_cycles_last", new NBFunctionGauge(this, () -> (double)this.cycles_max.get()));
|
||||
ActivityMetrics.gauge(this, "input_cycle", new NBFunctionGauge(this, () -> (double)this.cycle_value.get()));
|
||||
ActivityMetrics.gauge(this, "input_cycles_total", new NBFunctionGauge(this, this::getTotalCycles));
|
||||
ActivityMetrics.gauge(this, "input_recycles_first", new NBFunctionGauge(this, () -> (double)this.recycles_min.get()));
|
||||
ActivityMetrics.gauge(this, "input_recycles_last", new NBFunctionGauge(this, () -> (double)this.recycles_max.get()));
|
||||
ActivityMetrics.gauge(this, "input_recycle", new NBFunctionGauge(this, () -> (double) this.recycle_value.get()));
|
||||
ActivityMetrics.gauge(this, "input_recycles_total", new NBFunctionGauge(this, this::getTotalRecycles));
|
||||
|
||||
ActivityMetrics.register(new NBFunctionGauge(this, () -> (double)this.cycles_min.get(),"input_cycles_first"));
|
||||
ActivityMetrics.register(new NBFunctionGauge(this, () -> (double)this.cycles_max.get(), "input_cycles_last"));
|
||||
ActivityMetrics.register(new NBFunctionGauge(this, () -> (double)this.cycle_value.get(), "input_cycle"));
|
||||
ActivityMetrics.register(new NBFunctionGauge(this, this::getTotalCycles, "input_cycles_total"));
|
||||
ActivityMetrics.register(new NBFunctionGauge(this, () -> (double)this.recycles_min.get(), "input_recycles_first"));
|
||||
ActivityMetrics.register(new NBFunctionGauge(this, () -> (double)this.recycles_max.get(), "input_recycles_last"));
|
||||
ActivityMetrics.register(new NBFunctionGauge(this, () -> (double) this.recycle_value.get(),"input_recycle"));
|
||||
ActivityMetrics.register(new NBFunctionGauge(this, this::getTotalRecycles,"input_recycles_total"));
|
||||
}
|
||||
|
||||
private double getTotalRecycles() {
|
||||
|
@ -98,7 +98,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
|
||||
for (OpTemplate ot : opTemplates) {
|
||||
ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this);
|
||||
String driverName = incompleteOpDef.takeOptionalStaticValue("driver", String.class)
|
||||
.or(() -> incompleteOpDef.takeOptionalStaticValue("type",String.class))
|
||||
.or(() -> incompleteOpDef.takeOptionalStaticValue("type", String.class))
|
||||
.or(() -> defaultDriverOption)
|
||||
.orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
|
||||
|
||||
@ -152,12 +152,14 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
|
||||
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
|
||||
}
|
||||
|
||||
this.pendingOpsGauge= ActivityMetrics.gauge(this,"ops_pending",
|
||||
new NBFunctionGauge(this,() -> this.getProgressMeter().getSummary().pending()));
|
||||
this.activeOpsGauge = ActivityMetrics.gauge(this,"ops_active",
|
||||
new NBFunctionGauge(this,() -> this.getProgressMeter().getSummary().current()));
|
||||
this.completeOpsGauge= ActivityMetrics.gauge(this,"ops_complete",
|
||||
new NBFunctionGauge(this,() -> this.getProgressMeter().getSummary().complete()));
|
||||
this.pendingOpsGauge = ActivityMetrics.register(
|
||||
new NBFunctionGauge(this, () -> this.getProgressMeter().getSummary().pending(), "ops_pending")
|
||||
);
|
||||
this.activeOpsGauge = ActivityMetrics.register(
|
||||
new NBFunctionGauge(this, () -> this.getProgressMeter().getSummary().current(),"ops_active")
|
||||
);
|
||||
this.completeOpsGauge = ActivityMetrics.register(
|
||||
new NBFunctionGauge(this, () -> this.getProgressMeter().getSummary().complete(),"ops_complete"));
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -217,7 +219,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
|
||||
public void shutdownActivity() {
|
||||
for (Map.Entry<String, DriverAdapter> entry : adapters.entrySet()) {
|
||||
String adapterName = entry.getKey();
|
||||
DriverAdapter<?,?> adapter = entry.getValue();
|
||||
DriverAdapter<?, ?> adapter = entry.getValue();
|
||||
adapter.getSpaceCache().getElements().forEach((spaceName, space) -> {
|
||||
if (space instanceof AutoCloseable autocloseable) {
|
||||
try {
|
||||
|
@ -59,6 +59,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
|
||||
private final NBErrorHandler errorHandler;
|
||||
private final OpSequence<OpDispenser<? extends Op>> opsequence;
|
||||
private final int maxTries;
|
||||
private final Timer verifierTimer;
|
||||
|
||||
public StandardAction(A activity, int slot) {
|
||||
this.activity = activity;
|
||||
@ -71,6 +72,7 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
|
||||
resultTimer = activity.getInstrumentation().getOrCreateResultTimer();
|
||||
resultSuccessTimer = activity.getInstrumentation().getOrCreateResultSuccessTimer();
|
||||
errorHandler = activity.getErrorHandler();
|
||||
verifierTimer = activity.getInstrumentation().getOrCreateVerifierTimer();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -109,16 +111,18 @@ public class StandardAction<A extends StandardActivity<R, ?>, R extends Op> impl
|
||||
"one of [RunnableOp, CycleOp, or ChainingOp]");
|
||||
}
|
||||
|
||||
CycleFunction<Boolean> verifier = dispenser.getVerifier();
|
||||
try {
|
||||
verifier.setVariable("result", result);
|
||||
verifier.setVariable("cycle",cycle);
|
||||
Boolean isGood = verifier.apply(cycle);
|
||||
if (!isGood) {
|
||||
throw new ResultVerificationError("result verification failed", maxTries - tries, verifier.getExpressionDetails());
|
||||
try (Timer.Context ignored = verifierTimer.time()) {
|
||||
CycleFunction<Boolean> verifier = dispenser.getVerifier();
|
||||
try {
|
||||
verifier.setVariable("result", result);
|
||||
verifier.setVariable("cycle",cycle);
|
||||
Boolean isGood = verifier.apply(cycle);
|
||||
if (!isGood) {
|
||||
throw new ResultVerificationError("result verification failed", maxTries - tries, verifier.getExpressionDetails());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ResultVerificationError(e, maxTries - tries, verifier.getExpressionDetails());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new ResultVerificationError(e, maxTries - tries, verifier.getExpressionDetails());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
error = e;
|
||||
|
@ -41,7 +41,7 @@ public class ExceptionTimerMetrics {
|
||||
|
||||
this.allerrors = ActivityMetrics.timer(
|
||||
parentLabels,
|
||||
"errortimers.ALL",
|
||||
"errortimers_ALL",
|
||||
activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4)
|
||||
);
|
||||
}
|
||||
|
@ -55,4 +55,9 @@ public class ActivityMetricsTest {
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSanityCheckSanitize() {
|
||||
String result = ActivityMetrics.sanitize("test-dynamic-params.input_cycles_first");
|
||||
assertThat(result).isEqualTo("test_dynamic_params__input_cycles_first");
|
||||
}
|
||||
}
|
||||
|
@ -15,6 +15,9 @@
|
||||
*/
|
||||
package io.nosqlbench.engine.core.lifecycle.activity;
|
||||
|
||||
import com.codahale.metrics.Gauge;
|
||||
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBFunctionGauge;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import io.nosqlbench.engine.api.activityapi.core.*;
|
||||
@ -72,6 +75,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
|
||||
private long stoppedAt = 0L;
|
||||
|
||||
private ActivityExecutorShutdownHook shutdownHook = null;
|
||||
private NBFunctionGauge threadsGauge;
|
||||
|
||||
public ActivityExecutor(Activity activity, String sessionId) {
|
||||
this.activity = activity;
|
||||
@ -409,6 +413,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
|
||||
// before threads start running such as metrics instruments
|
||||
activity.initActivity();
|
||||
startMotorExecutorService();
|
||||
registerMetrics();
|
||||
startRunningActivityThreads();
|
||||
awaitMotorsAtLeastRunning();
|
||||
logger.debug("STARTED " + activityDef.getAlias());
|
||||
@ -417,6 +422,7 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
|
||||
this.exception = e;
|
||||
} finally {
|
||||
stoppedAt=System.currentTimeMillis();
|
||||
unregisterMetrics();
|
||||
activity.shutdownActivity();
|
||||
activity.closeAutoCloseables();
|
||||
ExecutionResult result = new ExecutionResult(startedAt, stoppedAt, "", exception);
|
||||
@ -442,14 +448,22 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void startActivity() {
|
||||
RunStateImage startable = tally.awaitNoneOther(1000L, RunState.Uninitialized, RunState.Stopped);
|
||||
if (startable.isTimeout()) {
|
||||
throw new RuntimeException("Unable to start activity '" + getActivity().getAlias() + "' which is in state " + startable);
|
||||
}
|
||||
startMotorExecutorService();
|
||||
startRunningActivityThreads();
|
||||
awaitMotorsAtLeastRunning();
|
||||
// public synchronized void startActivity() {
|
||||
// RunStateImage startable = tally.awaitNoneOther(1000L, RunState.Uninitialized, RunState.Stopped);
|
||||
// if (startable.isTimeout()) {
|
||||
// throw new RuntimeException("Unable to start activity '" + getActivity().getAlias() + "' which is in state " + startable);
|
||||
// }
|
||||
// startMotorExecutorService();
|
||||
// startRunningActivityThreads();
|
||||
// awaitMotorsAtLeastRunning();
|
||||
// }
|
||||
|
||||
private void registerMetrics() {
|
||||
this.threadsGauge = ActivityMetrics.register(new NBFunctionGauge(activity, () -> (double) this.motors.size(), "threads"));
|
||||
}
|
||||
private void unregisterMetrics() {
|
||||
ActivityMetrics.unregister(this.threadsGauge);
|
||||
this.threadsGauge=null;
|
||||
}
|
||||
|
||||
private boolean shutdownExecutorService(int secondsToWait) {
|
||||
@ -567,6 +581,16 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
|
||||
.build());
|
||||
}
|
||||
|
||||
private class ThreadsGauge implements Gauge<Double> {
|
||||
public ThreadsGauge(ActivityExecutor activityExecutor) {
|
||||
ActivityExecutor ae = activityExecutor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double getValue() {
|
||||
return (double) ActivityExecutor.this.motors.size();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -16,18 +16,18 @@
|
||||
|
||||
package io.nosqlbench.engine.core;
|
||||
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityapi.core.*;
|
||||
import io.nosqlbench.engine.api.activityapi.input.Input;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.engine.api.activityapi.core.ActionDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.core.Activity;
|
||||
import io.nosqlbench.engine.api.activityapi.core.MotorDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.CoreServices;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.engine.api.activityimpl.action.CoreActionDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.input.AtomicInput;
|
||||
import io.nosqlbench.engine.api.activityimpl.input.CoreInputDispenser;
|
||||
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotor;
|
||||
import io.nosqlbench.engine.api.activityimpl.motor.CoreMotorDispenser;
|
||||
import io.nosqlbench.engine.core.lifecycle.ExecutionResult;
|
||||
import io.nosqlbench.engine.core.lifecycle.activity.ActivityExecutor;
|
||||
@ -37,11 +37,9 @@ import org.apache.logging.log4j.Logger;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.ForkJoinTask;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.fail;
|
||||
|
||||
class ActivityExecutorTest {
|
||||
@ -85,41 +83,41 @@ class ActivityExecutorTest {
|
||||
//
|
||||
// }
|
||||
|
||||
@Test
|
||||
synchronized void testDelayedStartSanity() {
|
||||
|
||||
ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
|
||||
new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY);
|
||||
|
||||
Activity activity = new DelayedInitActivity(activityDef);
|
||||
final InputDispenser inputDispenser = new CoreInputDispenser(activity);
|
||||
final ActionDispenser actionDispenser = new CoreActionDispenser(activity);
|
||||
final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
|
||||
|
||||
MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
|
||||
activity.setActionDispenserDelegate(actionDispenser);
|
||||
activity.setOutputDispenserDelegate(outputDispenser);
|
||||
activity.setInputDispenserDelegate(inputDispenser);
|
||||
activity.setMotorDispenserDelegate(motorDispenser);
|
||||
|
||||
ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
|
||||
|
||||
ExecutorService testExecutor = Executors.newCachedThreadPool();
|
||||
Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
|
||||
|
||||
|
||||
try {
|
||||
activityDef.setThreads(1);
|
||||
activityExecutor.startActivity();
|
||||
future.get();
|
||||
testExecutor.shutdownNow();
|
||||
|
||||
} catch (final Exception e) {
|
||||
fail("Unexpected exception", e);
|
||||
}
|
||||
|
||||
assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
|
||||
}
|
||||
// @Test
|
||||
// synchronized void testDelayedStartSanity() {
|
||||
//
|
||||
// ActivityDef activityDef = ActivityDef.parseActivityDef("driver=diag;alias=test-delayed-start;cycles=1000;initdelay=2000;");
|
||||
// new ActivityTypeLoader().load(activityDef, NBLabeledElement.EMPTY);
|
||||
//
|
||||
// Activity activity = new DelayedInitActivity(activityDef);
|
||||
// final InputDispenser inputDispenser = new CoreInputDispenser(activity);
|
||||
// final ActionDispenser actionDispenser = new CoreActionDispenser(activity);
|
||||
// final OutputDispenser outputDispenser = CoreServices.getOutputDispenser(activity).orElse(null);
|
||||
//
|
||||
// MotorDispenser<?> motorDispenser = new CoreMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
|
||||
// activity.setActionDispenserDelegate(actionDispenser);
|
||||
// activity.setOutputDispenserDelegate(outputDispenser);
|
||||
// activity.setInputDispenserDelegate(inputDispenser);
|
||||
// activity.setMotorDispenserDelegate(motorDispenser);
|
||||
//
|
||||
// ActivityExecutor activityExecutor = new ActivityExecutor(activity, "test-delayed-start");
|
||||
//
|
||||
// ExecutorService testExecutor = Executors.newCachedThreadPool();
|
||||
// Future<ExecutionResult> future = testExecutor.submit(activityExecutor);
|
||||
//
|
||||
//
|
||||
// try {
|
||||
// activityDef.setThreads(1);
|
||||
// activityExecutor.startActivity();
|
||||
// future.get();
|
||||
// testExecutor.shutdownNow();
|
||||
//
|
||||
// } catch (final Exception e) {
|
||||
// fail("Unexpected exception", e);
|
||||
// }
|
||||
//
|
||||
// assertThat(inputDispenser.getInput(10).getInputSegment(3)).isNull();
|
||||
// }
|
||||
|
||||
@Test
|
||||
synchronized void testNewActivityExecutor() {
|
||||
@ -129,7 +127,7 @@ class ActivityExecutorTest {
|
||||
|
||||
Activity simpleActivity = new SimpleActivity(activityDef, NBLabeledElement.forMap(Map.of()));
|
||||
|
||||
this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(simpleActivity,activityDef));
|
||||
// this.getActivityMotorFactory(this.motorActionDelay(999), new AtomicInput(simpleActivity,activityDef));
|
||||
|
||||
final InputDispenser inputDispenser = new CoreInputDispenser(simpleActivity);
|
||||
final ActionDispenser actionDispenser = new CoreActionDispenser(simpleActivity);
|
||||
@ -144,7 +142,9 @@ class ActivityExecutorTest {
|
||||
|
||||
ActivityExecutor activityExecutor = new ActivityExecutor(simpleActivity, "test-new-executor");
|
||||
activityDef.setThreads(5);
|
||||
activityExecutor.startActivity();
|
||||
ForkJoinTask<ExecutionResult> executionResultForkJoinTask = ForkJoinPool.commonPool().submit(activityExecutor);
|
||||
|
||||
// activityExecutor.startActivity();
|
||||
|
||||
final int[] speeds = {1, 50, 5, 50, 2, 50};
|
||||
for (int offset = 0; offset < speeds.length; offset += 2) {
|
||||
@ -160,6 +160,7 @@ class ActivityExecutorTest {
|
||||
fail("Not expecting exception", e);
|
||||
}
|
||||
}
|
||||
executionResultForkJoinTask.cancel(true);
|
||||
|
||||
// Used for slowing the roll due to state transitions in test.
|
||||
try {
|
||||
@ -170,18 +171,6 @@ class ActivityExecutorTest {
|
||||
}
|
||||
}
|
||||
|
||||
private MotorDispenser<?> getActivityMotorFactory(final Action lc, Input ls) {
|
||||
return new MotorDispenser<>() {
|
||||
@Override
|
||||
public Motor getMotor(final ActivityDef activityDef, final int slotId) {
|
||||
final Activity activity = new SimpleActivity(activityDef, NBLabeledElement.forMap(Map.of()));
|
||||
final Motor<?> cm = new CoreMotor<>(activity, slotId, ls);
|
||||
cm.setAction(lc);
|
||||
return cm;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private SyncAction motorActionDelay(long delay) {
|
||||
return new SyncAction() {
|
||||
@Override
|
||||
|
@ -26,7 +26,7 @@
|
||||
|
||||
<properties>
|
||||
|
||||
<revision>5.21.1-SNAPSHOT</revision>
|
||||
<revision>5.17.4-SNAPSHOT</revision>
|
||||
<!-- Set this level to override the logging level for tests during build -->
|
||||
<project.testlevel>INFO</project.testlevel>
|
||||
<!-- Set this level to override the logging level for tests logging configuration during build -->
|
||||
|
@ -34,6 +34,7 @@ import java.util.DoubleSummaryStatistics;
|
||||
import java.util.List;
|
||||
import java.util.ServiceLoader;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.prefs.BackingStoreException;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class ActivityMetrics {
|
||||
@ -76,10 +77,10 @@ public class ActivityMetrics {
|
||||
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
|
||||
private static Metric register(NBLabels labels, MetricProvider metricProvider) {
|
||||
|
||||
labels = labelFilter!=null ? labelFilter.apply(labels) : labels;
|
||||
labels = labelFilter != null ? labelFilter.apply(labels) : labels;
|
||||
labels = labelValidator != null ? labelValidator.apply(labels) : labels;
|
||||
|
||||
final String graphiteName = labels.linearizeValues('.',"[activity]","[space]","[op]","name");
|
||||
final String graphiteName = labels.linearizeValues('.', "[activity]", "[space]", "[op]", "name");
|
||||
Metric metric = get().getMetrics().get(graphiteName);
|
||||
|
||||
if (null == metric) {
|
||||
@ -96,6 +97,49 @@ public class ActivityMetrics {
|
||||
return metric;
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls to this version of register must be done with a pre-built metric instrument.
|
||||
* This means that it is not suitable for lazily creating metric objects directly on
|
||||
* instances which are one of many. Instead, call this to register metrics at the start
|
||||
* of an owning element.
|
||||
*
|
||||
* This version of register expects that you have fully labeled a metric, including
|
||||
* addint the 'name' field, also known as the <em>metric family name</em> in some specifications.
|
||||
*
|
||||
* It is due to be replaced by a different registry format soon.
|
||||
*
|
||||
* @param labeledMetric
|
||||
* @return the metric instance
|
||||
*/
|
||||
public static <M extends NBLabeledMetric> M register(M labeledMetric) {
|
||||
NBLabels labels = labeledMetric.getLabels();
|
||||
labels = labelFilter != null ? labelFilter.apply(labels) : labels;
|
||||
labels = labelValidator != null ? labelValidator.apply(labels) : labels;
|
||||
|
||||
final String graphiteName = labels.linearizeValues('.', "[activity]", "[space]", "[op]", "name");
|
||||
// String sanitized = sanitize(graphiteName);
|
||||
// if (!graphiteName.equals(sanitized)) {
|
||||
// throw new RuntimeException("Attempted to register a metric which was not compatible with labeled metric forms. Submitted as '" + graphiteName + "', but should likely be '" + sanitized + "'");
|
||||
// }
|
||||
Metric metric = get().getMetrics().get(graphiteName);
|
||||
|
||||
metric = get().getMetrics().get(graphiteName);
|
||||
if (metric!=null) {
|
||||
logger.warn("Metric already registered for '" + graphiteName + "', probably logic error which could invalidate metric values.");
|
||||
} else {
|
||||
get().register(graphiteName, labeledMetric);
|
||||
}
|
||||
return labeledMetric;
|
||||
}
|
||||
|
||||
public static void unregister(NBLabeledElement element) {
|
||||
final String graphiteName = element.getLabels().linearizeValues('.', "[activity]", "[space]", "[op]", "name");
|
||||
if (!get().getMetrics().containsKey(graphiteName)) {
|
||||
logger.warn("Removing non-extant metric by name: '"+ graphiteName + "'");
|
||||
}
|
||||
get().remove(graphiteName);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Create a timer associated with an activity.</p>
|
||||
*
|
||||
@ -112,7 +156,7 @@ public class ActivityMetrics {
|
||||
* @return the timer, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Timer timer(NBLabeledElement parent, String metricFamilyName, int hdrdigits) {
|
||||
final NBLabels labels = parent.getLabels().and("name",sanitize(metricFamilyName));
|
||||
final NBLabels labels = parent.getLabels().and("name", sanitize(metricFamilyName));
|
||||
|
||||
|
||||
Timer registeredTimer = (Timer) register(labels, () ->
|
||||
@ -164,7 +208,7 @@ public class ActivityMetrics {
|
||||
* @return the counter, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Counter counter(NBLabeledElement parent, String metricFamilyName) {
|
||||
final NBLabels labels = parent.getLabels().and("name",metricFamilyName);
|
||||
final NBLabels labels = parent.getLabels().and("name", metricFamilyName);
|
||||
return (Counter) register(labels, () -> new NBMetricCounter(labels));
|
||||
}
|
||||
|
||||
@ -180,7 +224,7 @@ public class ActivityMetrics {
|
||||
* @return the meter, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Meter meter(NBLabeledElement parent, String metricFamilyName) {
|
||||
final NBLabels labels = parent.getLabels().and("name",sanitize(metricFamilyName));
|
||||
final NBLabels labels = parent.getLabels().and("name", sanitize(metricFamilyName));
|
||||
return (Meter) register(labels, () -> new NBMetricMeter(labels));
|
||||
}
|
||||
|
||||
@ -201,27 +245,30 @@ public class ActivityMetrics {
|
||||
* and so on. It uses the same data reservoir for all views, but only returns one of them as a handle to the metric.
|
||||
* This has the effect of leaving some of the metric objects unreferencable from the caller side. This may need
|
||||
* to be changed in a future update in the even that full inventory management is required on metric objects here.
|
||||
* @param parent The labeled element the metric pertains to
|
||||
* @param metricFamilyName The name of the measurement
|
||||
*
|
||||
* @param parent
|
||||
* The labeled element the metric pertains to
|
||||
* @param metricFamilyName
|
||||
* The name of the measurement
|
||||
* @return One of the created metrics, suitable for calling {@link DoubleSummaryGauge#accept(double)} on.
|
||||
*/
|
||||
public static DoubleSummaryGauge summaryGauge(NBLabeledElement parent, String metricFamilyName) {
|
||||
DoubleSummaryStatistics stats = new DoubleSummaryStatistics();
|
||||
DoubleSummaryGauge anyGauge = null;
|
||||
for (DoubleSummaryGauge.Stat statName: DoubleSummaryGauge.Stat.values()){
|
||||
for (DoubleSummaryGauge.Stat statName : DoubleSummaryGauge.Stat.values()) {
|
||||
final NBLabels labels = parent.getLabels()
|
||||
.and("name",sanitize(metricFamilyName))
|
||||
.modifyValue("name", n -> n+"_"+statName.name().toLowerCase());
|
||||
anyGauge= (DoubleSummaryGauge) register(labels, () -> new DoubleSummaryGauge(labels,statName,stats));
|
||||
.and("name", sanitize(metricFamilyName))
|
||||
.modifyValue("name", n -> n + "_" + statName.name().toLowerCase());
|
||||
anyGauge = (DoubleSummaryGauge) register(labels, () -> new DoubleSummaryGauge(labels, statName, stats));
|
||||
}
|
||||
return anyGauge;
|
||||
}
|
||||
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> Gauge<T> gauge(NBLabeledElement parent, String metricFamilyName, Gauge<T> gauge) {
|
||||
final NBLabels labels = parent.getLabels().and("name",sanitize(metricFamilyName));
|
||||
|
||||
return (Gauge<T>) register(labels, () -> new NBMetricGaugeWrapper<>(labels,gauge));
|
||||
final NBLabels labels = parent.getLabels().and("name", sanitize(metricFamilyName));
|
||||
return (Gauge<T>) register(labels, () -> new NBMetricGaugeWrapper<>(labels, gauge));
|
||||
}
|
||||
|
||||
private static MetricRegistry lookupRegistry() {
|
||||
@ -375,10 +422,11 @@ public class ActivityMetrics {
|
||||
.forEach(get()::remove);
|
||||
}
|
||||
|
||||
|
||||
public static String sanitize(String word) {
|
||||
String sanitized = word;
|
||||
sanitized = sanitized.replaceAll("\\..+$", "");
|
||||
sanitized = sanitized.replaceAll("-","_");
|
||||
sanitized = sanitized.replaceAll("\\.", "__");
|
||||
sanitized = sanitized.replaceAll("-", "_");
|
||||
sanitized = sanitized.replaceAll("[^a-zA-Z0-9_]+", "");
|
||||
|
||||
if (!word.equals(sanitized)) {
|
||||
|
@ -19,6 +19,7 @@ package io.nosqlbench.api.engine.metrics.instruments;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class NBFunctionGauge implements NBMetricGauge<Double> {
|
||||
@ -27,11 +28,14 @@ public class NBFunctionGauge implements NBMetricGauge<Double> {
|
||||
private final NBLabeledElement parent;
|
||||
private final NBLabels labels;
|
||||
|
||||
public NBFunctionGauge(NBLabeledElement parent, Supplier<Double> source, Object... labels) {
|
||||
public NBFunctionGauge(NBLabeledElement parent, Supplier<Double> source, String metricFamilyName, Map<String,String> additionalLabels) {
|
||||
this.parent = parent;
|
||||
this.labels = NBLabels.forKV(labels);
|
||||
this.labels = NBLabels.forMap(additionalLabels).and("name",metricFamilyName);
|
||||
this.source = source;
|
||||
}
|
||||
public NBFunctionGauge(NBLabeledElement parent, Supplier<Double> source, String metricFamilyName) {
|
||||
this(parent, source, metricFamilyName,Map.of());
|
||||
}
|
||||
@Override
|
||||
public Double getValue() {
|
||||
return source.get();
|
||||
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.api.engine.metrics.instruments;
|
||||
|
||||
import com.codahale.metrics.Metric;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
|
||||
public interface NBLabeledMetric extends Metric, NBLabeledElement {
|
||||
}
|
@ -17,10 +17,9 @@
|
||||
package io.nosqlbench.api.engine.metrics.instruments;
|
||||
|
||||
import com.codahale.metrics.Counter;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
|
||||
public class NBMetricCounter extends Counter implements NBLabeledElement {
|
||||
public class NBMetricCounter extends Counter implements NBLabeledMetric {
|
||||
|
||||
private final NBLabels labels;
|
||||
|
||||
|
@ -17,8 +17,7 @@
|
||||
package io.nosqlbench.api.engine.metrics.instruments;
|
||||
|
||||
import com.codahale.metrics.Gauge;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
|
||||
public interface NBMetricGauge<T> extends Gauge<T>, NBLabeledElement {
|
||||
public interface NBMetricGauge<T> extends Gauge<T>, NBLabeledMetric {
|
||||
|
||||
}
|
||||
|
@ -17,15 +17,14 @@
|
||||
package io.nosqlbench.api.engine.metrics.instruments;
|
||||
|
||||
import com.codahale.metrics.Histogram;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import io.nosqlbench.api.engine.metrics.*;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
|
||||
public class NBMetricHistogram extends Histogram implements DeltaSnapshotter, HdrDeltaHistogramAttachment, HistogramAttachment, NBLabeledElement {
|
||||
public class NBMetricHistogram extends Histogram implements DeltaSnapshotter, HdrDeltaHistogramAttachment, HistogramAttachment, NBLabeledMetric {
|
||||
|
||||
private final DeltaHdrHistogramReservoir hdrDeltaReservoir;
|
||||
private final NBLabels labels;
|
||||
|
@ -17,10 +17,9 @@
|
||||
package io.nosqlbench.api.engine.metrics.instruments;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
|
||||
public class NBMetricMeter extends Meter implements NBLabeledElement {
|
||||
public class NBMetricMeter extends Meter implements NBLabeledMetric {
|
||||
|
||||
private final NBLabels labels;
|
||||
|
||||
|
@ -17,16 +17,15 @@
|
||||
package io.nosqlbench.api.engine.metrics.instruments;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.api.labels.NBLabeledElement;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import io.nosqlbench.api.engine.metrics.*;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import org.HdrHistogram.Histogram;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class NBMetricTimer extends Timer implements DeltaSnapshotter, HdrDeltaHistogramAttachment, TimerAttachment, NBLabeledElement {
|
||||
public class NBMetricTimer extends Timer implements DeltaSnapshotter, HdrDeltaHistogramAttachment, TimerAttachment, NBLabeledMetric {
|
||||
private final DeltaHdrHistogramReservoir deltaHdrHistogramReservoir;
|
||||
private long cacheExpiry;
|
||||
private List<Timer> mirrors;
|
||||
|
Loading…
Reference in New Issue
Block a user