Metrics Publishing (#1234)

Included the following with core changes to allow labeled metrics for Prometheus exposition format publishing.
* javadoc updates
* remove extra types
* use NBLabeledElement instead of NBNamedElement
* contextualize NBLabeledElement for graphite/metrics
* externalize labeled ScriptContext to API
* add labels to NicerTimer
* remove partial packaging
* more progress on formatting for prom exposition format
* added metrics diagram
* resolve build issues with label params
* resolve build issues with label params
* prometheus export services
* added PromExpoFormat Tests for NBMetricMeter(Counting+Sampling) and NBMetricTimer(Counting)
* added test for Gauge Formatting
* added Gauge Formatting as well as Sampling values (count, stdev ...)
* added sketch for metrics labeling contexts
* add NBLabeledElement in all the places, retool calling paths to use it
* synchronize antlr versions after partial snyk change
* unbreak static initializer block after IntelliJ "fixed" it.
* engine-api - adapt to NBLabeledElement
* adapters-api - adapt to NBLabeledElement
* nb-api - adapt to NBLabeledElement
* engine-core - adapt to NBLabeledElement
* misc-adapters - adapt to NBLabeledElement
* streaming-adapters - adapt to NBLabeledElement
* add missing test
* initial implementation of a prom push reporter
* Resolve build issue with parseGlobalOptions
* replaced with PromPushReporter
* cleanup unused deps
* dependency removal for micrometer
* allow empty labels for tests
* change space.getName to space.getSpaceName
* cleanup poms
* graphite linearization now includes space element
* http adapter should only depend on adapters API
* http space does not create its own metric names
* import cleanups
* improved javadocs
* introduce component concepts

---------

Co-authored-by: Jonathan Shook <jshook@gmail.com>
Co-authored-by: Mike Yaacoub <mike.yaacoub@datastax.com>
This commit is contained in:
Jeff Banks
2023-05-09 09:52:42 -05:00
committed by GitHub
parent bca2d8a2ee
commit 02ff160b3c
149 changed files with 5713 additions and 4551 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,9 @@
package io.nosqlbench.engine.api.activityapi.core;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressCapable;
import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable;
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser;
@@ -25,8 +27,6 @@ import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
@@ -38,7 +38,7 @@ import java.util.function.Supplier;
* Provides the components needed to build and run an activity a runtime.
* The easiest way to build a useful Activity is to extend {@link SimpleActivity}.
*/
public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable, NBNamedElement {
public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable, NBLabeledElement {
/**
* Provide the activity with the controls needed to stop itself.
@@ -59,11 +59,11 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
ActivityDef getActivityDef();
default String getAlias() {
return getActivityDef().getAlias();
return this.getActivityDef().getAlias();
}
default ParameterMap getParams() {
return getActivityDef().getParams();
return this.getActivityDef().getParams();
}
default void initActivity() {
@@ -94,6 +94,7 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
void setOutputDispenserDelegate(OutputDispenser outputDispenser);
@Override
RunState getRunState();
void setRunState(RunState runState);
@@ -104,7 +105,7 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
}
default String getCycleSummary() {
return getActivityDef().getCycleSummary();
return this.getActivityDef().getCycleSummary();
}
/**
@@ -214,7 +215,7 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
int getMaxTries();
default int getHdrDigits() {
return getParams().getOptionalInteger("hdr_digits").orElse(4);
return this.getParams().getOptionalInteger("hdr_digits").orElse(4);
}
RunStateTally getRunStateTally();

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
package io.nosqlbench.engine.api.activityapi.core;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
@@ -46,8 +47,8 @@ public interface ActivityType<A extends Activity> {
* @return a distinct Activity instance for each call
*/
@SuppressWarnings("unchecked")
default A getActivity(ActivityDef activityDef) {
SimpleActivity activity = new SimpleActivity(activityDef);
default A getActivity(final ActivityDef activityDef, final NBLabeledElement parentLabels) {
final SimpleActivity activity = new SimpleActivity(activityDef, parentLabels);
return (A) activity;
}
@@ -59,31 +60,25 @@ public interface ActivityType<A extends Activity> {
* @param activities a map of existing activities
* @return a distinct activity instance for each call
*/
default Activity getAssembledActivity(ActivityDef activityDef, Map<String, Activity> activities) {
A activity = getActivity(activityDef);
default Activity getAssembledActivity(final ActivityDef activityDef, final Map<String, Activity> activities, final NBLabeledElement labels) {
final A activity = this.getActivity(activityDef, labels);
InputDispenser inputDispenser = getInputDispenser(activity);
if (inputDispenser instanceof ActivitiesAware) {
((ActivitiesAware) inputDispenser).setActivitiesMap(activities);
}
final InputDispenser inputDispenser = this.getInputDispenser(activity);
if (inputDispenser instanceof ActivitiesAware) ((ActivitiesAware) inputDispenser).setActivitiesMap(activities);
activity.setInputDispenserDelegate(inputDispenser);
ActionDispenser actionDispenser = getActionDispenser(activity);
if (actionDispenser instanceof ActivitiesAware) {
final ActionDispenser actionDispenser = this.getActionDispenser(activity);
if (actionDispenser instanceof ActivitiesAware)
((ActivitiesAware) actionDispenser).setActivitiesMap(activities);
}
activity.setActionDispenserDelegate(actionDispenser);
OutputDispenser outputDispenser = getOutputDispenser(activity).orElse(null);
if (outputDispenser !=null && outputDispenser instanceof ActivitiesAware) {
final OutputDispenser outputDispenser = this.getOutputDispenser(activity).orElse(null);
if ((null != outputDispenser) && (outputDispenser instanceof ActivitiesAware))
((ActivitiesAware) outputDispenser).setActivitiesMap(activities);
}
activity.setOutputDispenserDelegate(outputDispenser);
MotorDispenser motorDispenser = getMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
if (motorDispenser instanceof ActivitiesAware) {
((ActivitiesAware) motorDispenser).setActivitiesMap(activities);
}
final MotorDispenser motorDispenser = this.getMotorDispenser(activity, inputDispenser, actionDispenser, outputDispenser);
if (motorDispenser instanceof ActivitiesAware) ((ActivitiesAware) motorDispenser).setActivitiesMap(activities);
activity.setMotorDispenserDelegate(motorDispenser);
return activity;
@@ -95,7 +90,7 @@ public interface ActivityType<A extends Activity> {
* @param activity The activity instance that will parameterize the returned MarkerDispenser instance.
* @return an instance of MarkerDispenser
*/
default Optional<OutputDispenser> getOutputDispenser(A activity) {
default Optional<OutputDispenser> getOutputDispenser(final A activity) {
return CoreServices.getOutputDispenser(activity);
}
@@ -105,7 +100,7 @@ public interface ActivityType<A extends Activity> {
* @param activity The activity instance that will parameterize the returned ActionDispenser instance.
* @return an instance of ActionDispenser
*/
default ActionDispenser getActionDispenser(A activity) {
default ActionDispenser getActionDispenser(final A activity) {
return new CoreActionDispenser(activity);
}
@@ -116,15 +111,15 @@ public interface ActivityType<A extends Activity> {
* @param activity the Activity instance which will parameterize this InputDispenser
* @return the InputDispenser for the associated activity
*/
default InputDispenser getInputDispenser(A activity) {
default InputDispenser getInputDispenser(final A activity) {
return CoreServices.getInputDispenser(activity);
}
default <T> MotorDispenser<T> getMotorDispenser(
A activity,
InputDispenser inputDispenser,
ActionDispenser actionDispenser,
OutputDispenser outputDispenser) {
final A activity,
final InputDispenser inputDispenser,
final ActionDispenser actionDispenser,
final OutputDispenser outputDispenser) {
return new CoreMotorDispenser<T> (activity, inputDispenser, actionDispenser, outputDispenser);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -37,84 +37,80 @@ public class CoreActivityInstrumentation implements ActivityInstrumentation {
private final String svcTimeSuffix;
private final boolean strictNaming;
public CoreActivityInstrumentation(Activity activity) {
public CoreActivityInstrumentation(final Activity activity) {
this.activity = activity;
this.def = activity.getActivityDef();
this.params = def.getParams();
this.strictNaming = params.getOptionalBoolean(STRICTMETRICNAMES).orElse(true);
svcTimeSuffix = strictNaming ? SERVICE_TIME : "";
def = activity.getActivityDef();
params = this.def.getParams();
strictNaming = this.params.getOptionalBoolean(CoreActivityInstrumentation.STRICTMETRICNAMES).orElse(true);
this.svcTimeSuffix = this.strictNaming ? CoreActivityInstrumentation.SERVICE_TIME : "";
}
@Override
public synchronized Timer getOrCreateInputTimer() {
String metricName = "read_input";
return ActivityMetrics.timer(def, metricName, activity.getHdrDigits());
final String metricName = "read_input";
return ActivityMetrics.timer(this.activity, metricName, this.activity.getHdrDigits());
}
@Override
public synchronized Timer getOrCreateStridesServiceTimer() {
return ActivityMetrics.timer(def, "strides" + SERVICE_TIME, activity.getHdrDigits());
return ActivityMetrics.timer(this.activity, "strides" + CoreActivityInstrumentation.SERVICE_TIME, this.activity.getHdrDigits());
}
@Override
public synchronized Timer getStridesResponseTimerOrNull() {
if (activity.getStrideLimiter()==null) {
return null;
}
return ActivityMetrics.timer(def, "strides" + RESPONSE_TIME, activity.getHdrDigits());
if (null == activity.getStrideLimiter()) return null;
return ActivityMetrics.timer(this.activity, "strides" + CoreActivityInstrumentation.RESPONSE_TIME, this.activity.getHdrDigits());
}
@Override
public synchronized Timer getOrCreateCyclesServiceTimer() {
return ActivityMetrics.timer(def, "cycles" + svcTimeSuffix, activity.getHdrDigits());
return ActivityMetrics.timer(this.activity, "cycles" + this.svcTimeSuffix, this.activity.getHdrDigits());
}
@Override
public synchronized Timer getCyclesResponseTimerOrNull() {
if (activity.getCycleLimiter()==null) {
return null;
}
String metricName = "cycles" + RESPONSE_TIME;
return ActivityMetrics.timer(def, metricName, activity.getHdrDigits());
if (null == activity.getCycleLimiter()) return null;
final String metricName = "cycles" + CoreActivityInstrumentation.RESPONSE_TIME;
return ActivityMetrics.timer(this.activity, metricName, this.activity.getHdrDigits());
}
@Override
public synchronized Counter getOrCreatePendingOpCounter() {
String metricName = "pending_ops";
return ActivityMetrics.counter(def, metricName);
final String metricName = "pending_ops";
return ActivityMetrics.counter(this.activity, metricName);
}
@Override
public synchronized Counter getOrCreateOpTrackerBlockedCounter() {
String metricName = "optracker_blocked";
return ActivityMetrics.counter(def, metricName);
final String metricName = "optracker_blocked";
return ActivityMetrics.counter(this.activity, metricName);
}
@Override
public synchronized Timer getOrCreateBindTimer() {
return ActivityMetrics.timer(def, "bind", activity.getHdrDigits());
return ActivityMetrics.timer(this.activity, "bind", this.activity.getHdrDigits());
}
@Override
public synchronized Timer getOrCreateExecuteTimer() {
return ActivityMetrics.timer(def,"execute", activity.getHdrDigits());
return ActivityMetrics.timer(this.activity,"execute", this.activity.getHdrDigits());
}
@Override
public synchronized Timer getOrCreateResultTimer() {
return ActivityMetrics.timer(def,"result", activity.getHdrDigits());
return ActivityMetrics.timer(this.activity,"result", this.activity.getHdrDigits());
}
@Override
public synchronized Timer getOrCreateResultSuccessTimer() {
return ActivityMetrics.timer(def,"result-success", activity.getHdrDigits());
return ActivityMetrics.timer(this.activity,"result-success", this.activity.getHdrDigits());
}
@Override
public synchronized Histogram getOrCreateTriesHistogram() {
return ActivityMetrics.histogram(def,"tries", activity.getHdrDigits());
return ActivityMetrics.histogram(this.activity,"tries", this.activity.getHdrDigits());
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@
package io.nosqlbench.engine.api.activityapi.errorhandling;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.metrics.ExceptionCountMetrics;
import io.nosqlbench.engine.api.metrics.ExceptionHistoMetrics;
@@ -26,42 +27,36 @@ import java.util.function.Supplier;
public class ErrorMetrics {
private final ActivityDef activityDef;
private final NBLabeledElement parentLabels;
private ExceptionCountMetrics exceptionCountMetrics;
private ExceptionHistoMetrics exceptionHistoMetrics;
private ExceptionMeterMetrics exceptionMeterMetrics;
private ExceptionTimerMetrics exceptionTimerMetrics;
public ErrorMetrics(ActivityDef activityDef) {
this.activityDef = activityDef;
public ErrorMetrics(final NBLabeledElement parentLabels) {
this.parentLabels = parentLabels;
}
public synchronized ExceptionCountMetrics getExceptionCountMetrics() {
if (exceptionCountMetrics == null) {
exceptionCountMetrics = new ExceptionCountMetrics(activityDef);
}
return exceptionCountMetrics;
if (null == exceptionCountMetrics) this.exceptionCountMetrics = new ExceptionCountMetrics(this.parentLabels);
return this.exceptionCountMetrics;
}
public synchronized ExceptionHistoMetrics getExceptionHistoMetrics() {
if (exceptionHistoMetrics == null) {
exceptionHistoMetrics = new ExceptionHistoMetrics(activityDef);
}
return exceptionHistoMetrics;
if (null == exceptionHistoMetrics)
this.exceptionHistoMetrics = new ExceptionHistoMetrics(this.parentLabels, ActivityDef.parseActivityDef(""));
return this.exceptionHistoMetrics;
}
public synchronized ExceptionMeterMetrics getExceptionMeterMetrics() {
if (exceptionMeterMetrics == null) {
exceptionMeterMetrics = new ExceptionMeterMetrics(activityDef);
}
return exceptionMeterMetrics;
if (null == exceptionMeterMetrics) this.exceptionMeterMetrics = new ExceptionMeterMetrics(this.parentLabels);
return this.exceptionMeterMetrics;
}
public synchronized ExceptionTimerMetrics getExceptionTimerMetrics() {
if (exceptionTimerMetrics == null) {
exceptionTimerMetrics = new ExceptionTimerMetrics(activityDef);
}
return exceptionTimerMetrics;
if (null == exceptionTimerMetrics)
this.exceptionTimerMetrics = new ExceptionTimerMetrics(this.parentLabels, ActivityDef.parseActivityDef(""));
return this.exceptionTimerMetrics;
}
public interface Aware {

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,9 +17,13 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import com.codahale.metrics.Gauge;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.engine.api.activityapi.core.Startable;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters.BurstRateGauge;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters.RateGauge;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters.WaitTimeGauge;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -77,10 +81,10 @@ import java.util.concurrent.atomic.AtomicLong;
* </p>
*/
@Service(value = RateLimiter.class, selector = "hybrid")
public class HybridRateLimiter implements Startable, RateLimiter {
public class HybridRateLimiter implements RateLimiter {
private final static Logger logger = LogManager.getLogger(HybridRateLimiter.class);
private NBNamedElement named;
private static final Logger logger = LogManager.getLogger(HybridRateLimiter.class);
private NBLabeledElement named;
//private volatile TokenFiller filler;
private volatile long starttime;
@@ -104,93 +108,87 @@ public class HybridRateLimiter implements Startable, RateLimiter {
protected HybridRateLimiter() {
}
public HybridRateLimiter(NBNamedElement named, String label, RateSpec rateSpec) {
setLabel(label);
init(named);
public HybridRateLimiter(final NBLabeledElement named, final String label, final RateSpec rateSpec) {
this.label = label;
this.init(named);
this.named = named;
this.applyRateSpec(rateSpec);
applyRateSpec(rateSpec);
}
protected void setLabel(String label) {
protected void setLabel(final String label) {
this.label = label;
}
@Override
public long maybeWaitForOp() {
return tokens.blockAndTake();
return this.tokens.blockAndTake();
}
@Override
public long getTotalWaitTime() {
return this.cumulativeWaitTimeNanos.get() + getWaitTime();
return cumulativeWaitTimeNanos.get() + this.getWaitTime();
}
@Override
public long getWaitTime() {
return tokens.getWaitTime();
return this.tokens.getWaitTime();
}
@Override
public RateSpec getRateSpec() {
return this.rateSpec;
return rateSpec;
}
@Override
public synchronized void applyRateSpec(RateSpec updatingRateSpec) {
public synchronized void applyRateSpec(final RateSpec updatingRateSpec) {
if (updatingRateSpec == null) {
throw new RuntimeException("RateSpec must be defined");
}
if (null == updatingRateSpec) throw new RuntimeException("RateSpec must be defined");
if (updatingRateSpec.equals(this.rateSpec) && !updatingRateSpec.isRestart()) {
return;
}
if (updatingRateSpec.equals(rateSpec) && !updatingRateSpec.isRestart()) return;
this.rateSpec = updatingRateSpec;
this.tokens = (this.tokens == null) ? new ThreadDrivenTokenPool(rateSpec, named) : this.tokens.apply(named, rateSpec);
rateSpec = updatingRateSpec;
tokens = null == this.tokens ? new ThreadDrivenTokenPool(this.rateSpec, this.named) : tokens.apply(this.named, this.rateSpec);
// this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
// this.tokens = this.filler.getTokenPool();
if (this.state == State.Idle && updatingRateSpec.isAutoStart()) {
this.start();
} else if (updatingRateSpec.isRestart()) {
this.restart();
}
if ((State.Idle == this.state) && updatingRateSpec.isAutoStart()) start();
else if (updatingRateSpec.isRestart()) restart();
}
protected void init(NBNamedElement activityDef) {
this.delayGauge = ActivityMetrics.gauge(activityDef, label + ".waittime", new RateLimiters.WaitTimeGauge(this));
this.avgRateGauge = ActivityMetrics.gauge(activityDef, label + ".config.cyclerate", new RateLimiters.RateGauge(this));
this.burstRateGauge = ActivityMetrics.gauge(activityDef, label + ".config.burstrate", new RateLimiters.BurstRateGauge(this));
protected void init(final NBLabeledElement activityDef) {
delayGauge = ActivityMetrics.gauge(activityDef, this.label + ".waittime", new WaitTimeGauge(this));
avgRateGauge = ActivityMetrics.gauge(activityDef, this.label + ".config.cyclerate", new RateGauge(this));
burstRateGauge = ActivityMetrics.gauge(activityDef, this.label + ".config.burstrate", new BurstRateGauge(this));
}
@Override
public synchronized void start() {
switch (state) {
switch (this.state) {
case Started:
// logger.warn("Tried to start a rate limiter that was already started. If this is desired, use restart() instead");
// TODO: Find a better way to warn about spurious rate limiter
// starts, since the check condition was not properly isolated
break;
case Idle:
long nanos = getNanoClockTime();
this.starttime = nanos;
this.tokens.start();
state = State.Started;
final long nanos = this.getNanoClockTime();
starttime = nanos;
tokens.start();
this.state = State.Started;
break;
}
}
public synchronized long restart() {
switch (state) {
switch (this.state) {
case Idle:
this.start();
start();
return 0L;
case Started:
long accumulatedWaitSinceLastStart = cumulativeWaitTimeNanos.get();
cumulativeWaitTimeNanos.set(0L);
return this.tokens.restart() + accumulatedWaitSinceLastStart;
final long accumulatedWaitSinceLastStart = this.cumulativeWaitTimeNanos.get();
this.cumulativeWaitTimeNanos.set(0L);
return tokens.restart() + accumulatedWaitSinceLastStart;
default:
return 0L;
}
@@ -202,9 +200,9 @@ public class HybridRateLimiter implements Startable, RateLimiter {
}
private synchronized void checkpointCumulativeWaitTime() {
long nanos = getNanoClockTime();
this.starttime = nanos;
cumulativeWaitTimeNanos.addAndGet(getWaitTime());
final long nanos = this.getNanoClockTime();
starttime = nanos;
this.cumulativeWaitTimeNanos.addAndGet(this.getWaitTime());
}
protected long getNanoClockTime() {
@@ -213,17 +211,11 @@ public class HybridRateLimiter implements Startable, RateLimiter {
@Override
public String toString() {
StringBuilder sb = new StringBuilder(HybridRateLimiter.class.getSimpleName());
final StringBuilder sb = new StringBuilder(HybridRateLimiter.class.getSimpleName());
sb.append("{\n");
if (this.getRateSpec() != null) {
sb.append(" spec:").append(this.getRateSpec().toString());
}
if (this.tokens != null) {
sb.append(",\n tokenpool:").append(this.tokens.toString());
}
if (this.state != null) {
sb.append(",\n state:'").append(this.state).append("'");
}
if (null != this.getRateSpec()) sb.append(" spec:").append(rateSpec.toString());
if (null != this.tokens) sb.append(",\n tokenpool:").append(tokens);
if (null != this.state) sb.append(",\n state:'").append(state).append('\'');
sb.append("\n}");
return sb.toString();
}
@@ -240,16 +232,14 @@ public class HybridRateLimiter implements Startable, RateLimiter {
private class PoolGauge implements Gauge<Long> {
private final HybridRateLimiter rl;
public PoolGauge(HybridRateLimiter hybridRateLimiter) {
this.rl = hybridRateLimiter;
public PoolGauge(final HybridRateLimiter hybridRateLimiter) {
rl = hybridRateLimiter;
}
@Override
public Long getValue() {
TokenPool pool = rl.tokens;
if (pool==null) {
return 0L;
}
final TokenPool pool = this.rl.tokens;
if (null == pool) return 0L;
return pool.getWaitTime();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
@@ -25,6 +26,7 @@ import org.apache.logging.log4j.Logger;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -55,9 +57,10 @@ import static io.nosqlbench.engine.api.util.Colors.*;
*/
public class InlineTokenPool {
private final static Logger logger = LogManager.getLogger(InlineTokenPool.class);
private static final Logger logger = LogManager.getLogger(InlineTokenPool.class);
public static final double MIN_CONCURRENT_OPS = 5;
private final NBLabeledElement parentLabels;
// Size limit of active pool
private long maxActivePoolSize;
@@ -83,7 +86,7 @@ public class InlineTokenPool {
// metrics for refill
private final Timer refillTimer;
// update rate for refiller
private final long interval = (long) 1E6;
private final long interval = (long) 1.0E6;
private RateSpec rateSpec;
@@ -91,10 +94,10 @@ public class InlineTokenPool {
// private long debugRate=1000000000;
// Total number of thread blocks that occured since this token pool was started
private long blocks = 0L;
private long blocks;
private final Lock lock = new ReentrantLock();
private final Condition lockheld = lock.newCondition();
private final Condition lockheld = this.lock.newCondition();
/**
* This constructor tries to pick reasonable defaults for the token pool for
@@ -103,20 +106,22 @@ public class InlineTokenPool {
*
* @param rateSpec a {@link RateSpec}
*/
public InlineTokenPool(RateSpec rateSpec, ActivityDef def) {
ByteBuffer logbuf = getBuffer();
apply(rateSpec);
logger.debug("initialized token pool: " + this + " for rate:" + rateSpec);
this.refillTimer = ActivityMetrics.timer(def, "tokenfiller",4);
public InlineTokenPool(final RateSpec rateSpec, final ActivityDef def, final NBLabeledElement parentLabels) {
this.parentLabels = parentLabels;
final ByteBuffer logbuf = this.getBuffer();
this.apply(rateSpec);
InlineTokenPool.logger.debug("initialized token pool: {} for rate:{}", this, rateSpec);
refillTimer = ActivityMetrics.timer(parentLabels, "tokenfiller",4);
}
public InlineTokenPool(long poolsize, double burstRatio, ActivityDef def) {
ByteBuffer logbuf = getBuffer();
this.maxActivePoolSize = poolsize;
public InlineTokenPool(final long poolsize, final double burstRatio, final ActivityDef def, final NBLabeledElement parentLabels) {
this.parentLabels = parentLabels;
final ByteBuffer logbuf = this.getBuffer();
maxActivePoolSize = poolsize;
this.burstRatio = burstRatio;
this.maxActiveAndBurstSize = (long) (maxActivePoolSize * burstRatio);
this.maxBurstPoolSize = maxActiveAndBurstSize - maxActivePoolSize;
this.refillTimer = ActivityMetrics.timer(def, "tokenfiller",4);
maxActiveAndBurstSize = (long) (this.maxActivePoolSize * burstRatio);
maxBurstPoolSize = this.maxActiveAndBurstSize - this.maxActivePoolSize;
refillTimer = ActivityMetrics.timer(parentLabels, "tokenfiller",4);
}
/**
@@ -125,21 +130,21 @@ public class InlineTokenPool {
*
* @param rateSpec The rate specifier.
*/
public synchronized void apply(RateSpec rateSpec) {
public synchronized void apply(final RateSpec rateSpec) {
this.rateSpec = rateSpec;
// maxActivePool is set to the higher of 1M or however many nanos are needed for 2 ops to be buffered
this.maxActivePoolSize = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
this.maxActiveAndBurstSize = (long) (maxActivePoolSize * rateSpec.getBurstRatio());
this.burstRatio = rateSpec.getBurstRatio();
maxActivePoolSize = Math.max((long) 1.0E6, (long) (rateSpec.getNanosPerOp() * InlineTokenPool.MIN_CONCURRENT_OPS));
maxActiveAndBurstSize = (long) (this.maxActivePoolSize * rateSpec.getBurstRatio());
burstRatio = rateSpec.getBurstRatio();
this.maxBurstPoolSize = maxActiveAndBurstSize - maxActivePoolSize;
this.nanosPerOp = rateSpec.getNanosPerOp();
notifyAll();
maxBurstPoolSize = this.maxActiveAndBurstSize - this.maxActivePoolSize;
nanosPerOp = rateSpec.getNanosPerOp();
this.notifyAll();
}
public double getBurstRatio() {
return burstRatio;
return this.burstRatio;
}
/**
@@ -149,9 +154,9 @@ public class InlineTokenPool {
* @param amt tokens requested
* @return actual number of tokens removed, greater to or equal to zero
*/
public synchronized long takeUpTo(long amt) {
long take = Math.min(amt, activePool);
activePool -= take;
public synchronized long takeUpTo(final long amt) {
final long take = Math.min(amt, this.activePool);
this.activePool -= take;
return take;
}
@@ -163,30 +168,23 @@ public class InlineTokenPool {
*/
public long blockAndTake() {
synchronized (this) {
if (activePool >= nanosPerOp) {
activePool -= nanosPerOp;
return waitingPool + activePool;
if (this.activePool >= this.nanosPerOp) {
this.activePool -= this.nanosPerOp;
return this.waitingPool + this.activePool;
}
}
while (true) {
if (lock.tryLock()) {
try {
while (activePool < nanosPerOp) {
dorefill();
}
lockheld.signal();
lockheld.signal();
} finally {
lock.unlock();
}
} else {
try {
lockheld.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
while (true) if (this.lock.tryLock()) try {
while (this.activePool < this.nanosPerOp) this.dorefill();
this.lockheld.signal();
this.lockheld.signal();
} finally {
this.lock.unlock();
}
else try {
this.lockheld.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
// while (activePool < nanosPerOp) {
// blocks++;
// //System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
@@ -205,54 +203,52 @@ public class InlineTokenPool {
// return waitingPool + activePool;
}
public synchronized long blockAndTakeOps(long ops) {
long totalNanosNeeded = ops * nanosPerOp;
while (activePool < totalNanosNeeded) {
blocks++;
public synchronized long blockAndTakeOps(final long ops) {
final long totalNanosNeeded = ops * this.nanosPerOp;
while (this.activePool < totalNanosNeeded) {
this.blocks++;
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
wait();
this.wait();
// wait(maxActivePoolSize / 1000000, (int) maxActivePoolSize % 1000000);
} catch (InterruptedException ignored) {
} catch (Exception e) {
} catch (final InterruptedException ignored) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
activePool -= totalNanosNeeded;
return waitingPool + activePool;
this.activePool -= totalNanosNeeded;
return this.waitingPool + this.activePool;
}
public synchronized long blockAndTake(long tokens) {
while (activePool < tokens) {
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
wait();
public synchronized long blockAndTake(final long tokens) {
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
while (this.activePool < tokens) try {
this.wait();
// wait(maxActivePoolSize / 1000000, (int) maxActivePoolSize % 1000000);
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
} catch (final InterruptedException ignored) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
activePool -= tokens;
return waitingPool + activePool;
this.activePool -= tokens;
return this.waitingPool + this.activePool;
}
public long getWaitTime() {
return activePool + waitingPool;
return this.activePool + this.waitingPool;
}
public long getWaitPool() {
return waitingPool;
return this.waitingPool;
}
public long getActivePool() {
return activePool;
return this.activePool;
}
/**
@@ -269,70 +265,67 @@ public class InlineTokenPool {
* @param newTokens The number of new tokens to add to the token pools
* @return the total number of tokens in all pools
*/
public synchronized long refill(long newTokens) {
boolean debugthis = false;
public synchronized long refill(final long newTokens) {
final boolean debugthis = false;
// long debugAt = System.nanoTime();
// if (debugAt>debugTrigger+debugRate) {
// debugTrigger=debugAt;
// debugthis=true;
// }
long needed = Math.max(maxActivePoolSize - activePool, 0L);
long allocatedToActivePool = Math.min(newTokens, needed);
activePool += allocatedToActivePool;
final long needed = Math.max(this.maxActivePoolSize - this.activePool, 0L);
final long allocatedToActivePool = Math.min(newTokens, needed);
this.activePool += allocatedToActivePool;
// overflow logic
long allocatedToOverflowPool = newTokens - allocatedToActivePool;
waitingPool += allocatedToOverflowPool;
final long allocatedToOverflowPool = newTokens - allocatedToActivePool;
this.waitingPool += allocatedToOverflowPool;
// backfill logic
double refillFactor = Math.min((double) newTokens / maxActivePoolSize, 1.0D);
long burstFillAllowed = (long) (refillFactor * maxBurstPoolSize);
final double refillFactor = Math.min((double) newTokens / this.maxActivePoolSize, 1.0D);
long burstFillAllowed = (long) (refillFactor * this.maxBurstPoolSize);
burstFillAllowed = Math.min(maxActiveAndBurstSize - activePool, burstFillAllowed);
long burstFill = Math.min(burstFillAllowed, waitingPool);
burstFillAllowed = Math.min(this.maxActiveAndBurstSize - this.activePool, burstFillAllowed);
final long burstFill = Math.min(burstFillAllowed, this.waitingPool);
waitingPool -= burstFill;
activePool += burstFill;
this.waitingPool -= burstFill;
this.activePool += burstFill;
if (debugthis) {
System.out.print(this);
System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
if (allocatedToOverflowPool > 0) {
if (0 < allocatedToOverflowPool)
System.out.print(ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + ANSI_Reset);
}
if (burstFill > 0) {
System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
}
if (0 < burstFill) System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
System.out.println();
}
//System.out.println(this);
notifyAll();
this.notifyAll();
return activePool + waitingPool;
return this.activePool + this.waitingPool;
}
@Override
public String toString() {
return "Tokens: active=" + activePool + "/" + maxActivePoolSize
return "Tokens: active=" + this.activePool + '/' + this.maxActivePoolSize
+ String.format(
" (%3.1f%%)A (%3.1f%%)B ",
(((double) activePool / (double) maxActivePoolSize) * 100.0),
(((double) activePool / (double) maxActiveAndBurstSize) * 100.0)) + " waiting=" + waitingPool +
" blocks=" + blocks +
" rateSpec:" + ((rateSpec != null) ? rateSpec.toString() : "NULL");
(double) this.activePool / this.maxActivePoolSize * 100.0,
(double) this.activePool / this.maxActiveAndBurstSize * 100.0) + " waiting=" + this.waitingPool +
" blocks=" + this.blocks +
" rateSpec:" + (null != rateSpec ? this.rateSpec.toString() : "NULL");
}
public RateSpec getRateSpec() {
return rateSpec;
return this.rateSpec;
}
public synchronized long restart() {
long wait = activePool + waitingPool;
activePool = 0L;
waitingPool = 0L;
final long wait = this.activePool + this.waitingPool;
this.activePool = 0L;
this.waitingPool = 0L;
return wait;
}
@@ -340,33 +333,33 @@ public class InlineTokenPool {
RandomAccessFile image = null;
try {
image = new RandomAccessFile("tokenbucket.binlog", "rw");
ByteBuffer mbb = image.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, image.length());
final ByteBuffer mbb = image.getChannel().map(MapMode.READ_WRITE, 0, image.length());
return mbb;
} catch (Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public synchronized void dorefill() {
lastRefillAt = System.nanoTime();
long nextRefillTime = lastRefillAt + interval;
this.lastRefillAt = System.nanoTime();
final long nextRefillTime = this.lastRefillAt + this.interval;
long thisRefillTime = System.nanoTime();
while (thisRefillTime < nextRefillTime) {
// while (thisRefillTime < lastRefillAt + interval) {
long parkfor = Math.max(nextRefillTime - thisRefillTime, 0L);
final long parkfor = Math.max(nextRefillTime - thisRefillTime, 0L);
//System.out.println(ANSI_Blue + "parking for " + parkfor + "ns" + ANSI_Reset);
LockSupport.parkNanos(parkfor);
thisRefillTime = System.nanoTime();
}
// this.times[iteration]=thisRefillTime;
long delta = thisRefillTime - lastRefillAt;
final long delta = thisRefillTime - this.lastRefillAt;
// this.amounts[iteration]=delta;
lastRefillAt = thisRefillTime;
this.lastRefillAt = thisRefillTime;
//System.out.println(this);
refill(delta);
refillTimer.update(delta, TimeUnit.NANOSECONDS);
this.refill(delta);
this.refillTimer.update(delta, TimeUnit.NANOSECONDS);
// iteration++;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,68 +17,68 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import com.codahale.metrics.Gauge;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.config.NBLabeledElement;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class RateLimiters {
private final static Logger logger = LogManager.getLogger(RateLimiters.class);
public enum RateLimiters {
;
private static final Logger logger = LogManager.getLogger(RateLimiters.class);
public static synchronized RateLimiter createOrUpdate(NBNamedElement def, String label, RateLimiter extant, RateSpec spec) {
public static synchronized RateLimiter createOrUpdate(final NBLabeledElement def, final String label, final RateLimiter extant, final RateSpec spec) {
if (extant == null) {
RateLimiter rateLimiter= new HybridRateLimiter(def, label, spec);
if (null == extant) {
final RateLimiter rateLimiter= new HybridRateLimiter(def, label, spec);
logger.info(() -> "Using rate limiter: " + rateLimiter);
RateLimiters.logger.info(() -> "Using rate limiter: " + rateLimiter);
return rateLimiter;
} else {
extant.applyRateSpec(spec);
logger.info(() -> "Updated rate limiter: " + extant);
return extant;
}
extant.applyRateSpec(spec);
RateLimiters.logger.info(() -> "Updated rate limiter: " + extant);
return extant;
}
public static synchronized RateLimiter create(NBNamedElement def, String label, String specString) {
return createOrUpdate(def, label, null, new RateSpec(specString));
public static synchronized RateLimiter create(final NBLabeledElement def, final String label, final String specString) {
return RateLimiters.createOrUpdate(def, label, null, new RateSpec(specString));
}
public static class WaitTimeGauge implements Gauge<Long> {
private final RateLimiter rateLimiter;
public WaitTimeGauge(RateLimiter rateLimiter) {
public WaitTimeGauge(final RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
@Override
public Long getValue() {
return rateLimiter.getTotalWaitTime();
return this.rateLimiter.getTotalWaitTime();
}
}
public static class RateGauge implements Gauge<Double> {
private final RateLimiter rateLimiter;
public RateGauge(RateLimiter rateLimiter) {
public RateGauge(final RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
@Override
public Double getValue() {
return rateLimiter.getRateSpec().opsPerSec;
return this.rateLimiter.getRateSpec().opsPerSec;
}
}
public static class BurstRateGauge implements Gauge<Double> {
private final RateLimiter rateLimiter;
public BurstRateGauge(RateLimiter rateLimiter) {
public BurstRateGauge(final RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
@Override
public Double getValue() {
return rateLimiter.getRateSpec().getBurstRatio() * rateLimiter.getRateSpec().getRate();
return this.rateLimiter.getRateSpec().getBurstRatio() * this.rateLimiter.getRateSpec().getRate();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -46,7 +46,7 @@ import static io.nosqlbench.engine.api.util.Colors.*;
@Service(value= TokenPool.class, selector="threaded")
public class ThreadDrivenTokenPool implements TokenPool {
private final static Logger logger = LogManager.getLogger(ThreadDrivenTokenPool.class);
private static final Logger logger = LogManager.getLogger(ThreadDrivenTokenPool.class);
public static final double MIN_CONCURRENT_OPS = 2;
@@ -59,7 +59,7 @@ public class ThreadDrivenTokenPool implements TokenPool {
private volatile long waitingPool;
private RateSpec rateSpec;
private long nanosPerOp;
private long blocks = 0L;
private long blocks;
private TokenFiller filler;
@@ -70,9 +70,9 @@ public class ThreadDrivenTokenPool implements TokenPool {
*
* @param rateSpec a {@link RateSpec}
*/
public ThreadDrivenTokenPool(RateSpec rateSpec, NBNamedElement named) {
apply(named,rateSpec);
logger.debug(() -> "initialized token pool: " + this + " for rate:" + rateSpec);
public ThreadDrivenTokenPool(final RateSpec rateSpec, final NBLabeledElement named) {
this.apply(named,rateSpec);
ThreadDrivenTokenPool.logger.debug(() -> "initialized token pool: " + this + " for rate:" + rateSpec);
// filler.start();
}
@@ -83,23 +83,23 @@ public class ThreadDrivenTokenPool implements TokenPool {
* @param rateSpec The rate specifier.
*/
@Override
public synchronized TokenPool apply(NBNamedElement named, RateSpec rateSpec) {
public synchronized TokenPool apply(final NBLabeledElement labeled, final RateSpec rateSpec) {
this.rateSpec = rateSpec;
this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio());
this.burstRatio = rateSpec.getBurstRatio();
maxActivePool = Math.max((long) 1.0E6, (long) (rateSpec.getNanosPerOp() * ThreadDrivenTokenPool.MIN_CONCURRENT_OPS));
maxOverActivePool = (long) (this.maxActivePool * rateSpec.getBurstRatio());
burstRatio = rateSpec.getBurstRatio();
this.burstPoolSize = maxOverActivePool - maxActivePool;
this.nanosPerOp = rateSpec.getNanosPerOp();
this.filler = (this.filler == null) ? new TokenFiller(rateSpec, this, named, 3) : filler.apply(rateSpec);
notifyAll();
burstPoolSize = this.maxOverActivePool - this.maxActivePool;
nanosPerOp = rateSpec.getNanosPerOp();
filler = null == this.filler ? new TokenFiller(rateSpec, this, labeled, 3) : this.filler.apply(rateSpec);
this.notifyAll();
return this;
}
@Override
public double getBurstRatio() {
return burstRatio;
return this.burstRatio;
}
/**
@@ -110,9 +110,9 @@ public class ThreadDrivenTokenPool implements TokenPool {
* @return actual number of tokens removed, greater to or equal to zero
*/
@Override
public synchronized long takeUpTo(long amt) {
long take = Math.min(amt, activePool);
activePool -= take;
public synchronized long takeUpTo(final long amt) {
final long take = Math.min(amt, this.activePool);
this.activePool -= take;
return take;
}
@@ -124,55 +124,53 @@ public class ThreadDrivenTokenPool implements TokenPool {
*/
@Override
public synchronized long blockAndTake() {
while (activePool < nanosPerOp) {
blocks++;
while (this.activePool < this.nanosPerOp) {
this.blocks++;
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
wait(1000);
this.wait(1000);
// wait(maxActivePool / 1000000, 0);
} catch (InterruptedException ignored) {
} catch (Exception e) {
} catch (final InterruptedException ignored) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
activePool -= nanosPerOp;
return waitingPool + activePool;
this.activePool -= this.nanosPerOp;
return this.waitingPool + this.activePool;
}
@Override
public synchronized long blockAndTake(long tokens) {
while (activePool < tokens) {
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
} catch (InterruptedException ignored) {
} catch (Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
public synchronized long blockAndTake(final long tokens) {
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
while (this.activePool < tokens) try {
this.wait(this.maxActivePool / 1000000, (int) this.maxActivePool % 1000000);
} catch (final InterruptedException ignored) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
activePool -= tokens;
return waitingPool + activePool;
this.activePool -= tokens;
return this.waitingPool + this.activePool;
}
@Override
public long getWaitTime() {
return activePool + waitingPool;
return this.activePool + this.waitingPool;
}
@Override
public long getWaitPool() {
return waitingPool;
return this.waitingPool;
}
@Override
public long getActivePool() {
return activePool;
return this.activePool;
}
/**
@@ -189,77 +187,74 @@ public class ThreadDrivenTokenPool implements TokenPool {
* @param newTokens The number of new tokens to add to the token pools
* @return the total number of tokens in all pools
*/
public synchronized long refill(long newTokens) {
boolean debugthis = false;
public synchronized long refill(final long newTokens) {
final boolean debugthis = false;
// long debugAt = System.nanoTime();
// if (debugAt>debugTrigger+debugRate) {
// debugTrigger=debugAt;
// debugthis=true;
// }
long needed = Math.max(maxActivePool - activePool, 0L);
long allocatedToActivePool = Math.min(newTokens, needed);
activePool += allocatedToActivePool;
final long needed = Math.max(this.maxActivePool - this.activePool, 0L);
final long allocatedToActivePool = Math.min(newTokens, needed);
this.activePool += allocatedToActivePool;
// overflow logic
long allocatedToOverflowPool = newTokens - allocatedToActivePool;
waitingPool += allocatedToOverflowPool;
final long allocatedToOverflowPool = newTokens - allocatedToActivePool;
this.waitingPool += allocatedToOverflowPool;
// backfill logic
double refillFactor = Math.min((double) newTokens / maxActivePool, 1.0D);
long burstFillAllowed = (long) (refillFactor * burstPoolSize);
final double refillFactor = Math.min((double) newTokens / this.maxActivePool, 1.0D);
long burstFillAllowed = (long) (refillFactor * this.burstPoolSize);
burstFillAllowed = Math.min(maxOverActivePool - activePool, burstFillAllowed);
long burstFill = Math.min(burstFillAllowed, waitingPool);
burstFillAllowed = Math.min(this.maxOverActivePool - this.activePool, burstFillAllowed);
final long burstFill = Math.min(burstFillAllowed, this.waitingPool);
waitingPool -= burstFill;
activePool += burstFill;
this.waitingPool -= burstFill;
this.activePool += burstFill;
if (debugthis) {
System.out.print(this);
System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
if (allocatedToOverflowPool > 0) {
if (0 < allocatedToOverflowPool)
System.out.print(ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + ANSI_Reset);
}
if (burstFill > 0) {
System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
}
if (0 < burstFill) System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
System.out.println();
}
//System.out.println(this);
notifyAll();
this.notifyAll();
return activePool + waitingPool;
return this.activePool + this.waitingPool;
}
@Override
public String toString() {
return String.format(
"{ active:%d, max:%d, fill:'(%,3.1f%%)A (%,3.1f%%)B', wait_ns:%,d, blocks:%,d }",
activePool, maxActivePool,
(((double) activePool / (double) maxActivePool) * 100.0),
(((double) activePool / (double) maxOverActivePool) * 100.0),
waitingPool,
blocks
this.activePool, this.maxActivePool,
(double) this.activePool / this.maxActivePool * 100.0,
(double) this.activePool / this.maxOverActivePool * 100.0,
this.waitingPool,
this.blocks
);
}
@Override
public RateSpec getRateSpec() {
return rateSpec;
return this.rateSpec;
}
@Override
public synchronized long restart() {
long wait = activePool + waitingPool;
activePool = 0L;
waitingPool = 0L;
final long wait = this.activePool + this.waitingPool;
this.activePool = 0L;
this.waitingPool = 0L;
return wait;
}
@Override
public synchronized void start() {
filler.start();
this.filler.start();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -26,13 +26,13 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class TokenFiller implements Runnable {
private final static Logger logger = LogManager.getLogger(TokenFiller.class);
private static final Logger logger = LogManager.getLogger(TokenFiller.class);
public final static double MIN_PER_SECOND = 10D;
public final static double MAX_PER_SECOND = 1000D;
public static final double MIN_PER_SECOND = 10.0D;
public static final double MAX_PER_SECOND = 1000.0D;
// private final SysPerfData PERFDATA = SysPerf.get().getPerfData
// (false);
private final long interval = (long) 1E5;
private final long interval = (long) 1.0E5;
private final ThreadDrivenTokenPool tokenPool;
private volatile boolean running = true;
@@ -47,34 +47,34 @@ public class TokenFiller implements Runnable {
* in the JVM.
*
*/
public TokenFiller(RateSpec rateSpec, ThreadDrivenTokenPool tokenPool, NBNamedElement named, int hdrdigits) {
public TokenFiller(final RateSpec rateSpec, final ThreadDrivenTokenPool tokenPool, final NBLabeledElement labeled, final int hdrdigits) {
this.rateSpec = rateSpec;
this.tokenPool = tokenPool;
this.timer = ActivityMetrics.timer(named, "tokenfiller", hdrdigits);
timer = ActivityMetrics.timer(labeled, "tokenfiller", hdrdigits);
}
public TokenFiller apply(RateSpec rateSpec) {
public TokenFiller apply(final RateSpec rateSpec) {
this.rateSpec = rateSpec;
return this;
}
private void stop() {
this.running=false;
running=false;
}
public TokenPool getTokenPool() {
return tokenPool;
return this.tokenPool;
}
@Override
public void run() {
lastRefillAt = System.nanoTime();
while (running) {
long nextRefillTime = lastRefillAt + interval;
this.lastRefillAt = System.nanoTime();
while (this.running) {
final long nextRefillTime = this.lastRefillAt + this.interval;
long thisRefillTime = System.nanoTime();
while (thisRefillTime < nextRefillTime) {
// while (thisRefillTime < lastRefillAt + interval) {
long parkfor = Math.max(nextRefillTime - thisRefillTime, 0L);
final long parkfor = Math.max(nextRefillTime - thisRefillTime, 0L);
// System.out.println(ANSI_Blue + " parking for " + parkfor + "ns" + ANSI_Reset); System.out.flush();
LockSupport.parkNanos(parkfor);
// System.out.println(ANSI_Blue + "unparking for " + parkfor + "ns" + ANSI_Reset); System.out.flush();
@@ -82,33 +82,33 @@ public class TokenFiller implements Runnable {
}
// this.times[iteration]=thisRefillTime;
long delta = thisRefillTime - lastRefillAt;
final long delta = thisRefillTime - this.lastRefillAt;
// this.amounts[iteration]=delta;
lastRefillAt = thisRefillTime;
this.lastRefillAt = thisRefillTime;
// System.out.println(ANSI_Blue + this + ANSI_Reset); System.out.flush();
tokenPool.refill(delta);
timer.update(delta, TimeUnit.NANOSECONDS);
this.tokenPool.refill(delta);
this.timer.update(delta, TimeUnit.NANOSECONDS);
// iteration++;
}
}
public synchronized TokenFiller start() {
this.tokenPool.refill(rateSpec.getNanosPerOp());
tokenPool.refill(this.rateSpec.getNanosPerOp());
thread = new Thread(this);
thread.setName(this.toString());
thread.setPriority(Thread.MAX_PRIORITY);
thread.setDaemon(true);
thread.start();
logger.debug("Starting token filler thread: " + this);
this.thread = new Thread(this);
this.thread.setName(toString());
this.thread.setPriority(Thread.MAX_PRIORITY);
this.thread.setDaemon(true);
this.thread.start();
TokenFiller.logger.debug("Starting token filler thread: {}", this);
return this;
}
@Override
public String toString() {
return "TokenFiller spec=" + rateSpec + " interval=" + this.interval + "ns pool:" + tokenPool +" running=" + running;
return "TokenFiller spec=" + this.rateSpec + " interval=" + interval + "ns pool:" + this.tokenPool +" running=" + this.running;
}
// public String getRefillLog() {
@@ -120,9 +120,9 @@ public class TokenFiller implements Runnable {
// }
public synchronized long restart() {
this.lastRefillAt=System.nanoTime();
logger.debug("Restarting token filler at " + lastRefillAt + " thread: " + this);
long wait = this.tokenPool.restart();
lastRefillAt=System.nanoTime();
TokenFiller.logger.debug("Restarting token filler at {} thread: {}", this.lastRefillAt, this);
final long wait = tokenPool.restart();
return wait;
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,11 +16,11 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.config.NBLabeledElement;
public interface TokenPool {
TokenPool apply(NBNamedElement named, RateSpec rateSpec);
TokenPool apply(NBLabeledElement labeled, RateSpec rateSpec);
double getBurstRatio();

View File

@@ -17,6 +17,8 @@
package io.nosqlbench.engine.api.activityimpl;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
@@ -54,6 +56,7 @@ import org.apache.logging.log4j.Logger;
import java.io.InputStream;
import java.io.PrintWriter;
import java.lang.reflect.AnnotatedType;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -62,8 +65,9 @@ import java.util.stream.Collectors;
/**
* A default implementation of an Activity, suitable for building upon.
*/
public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObserver {
private final static Logger logger = LogManager.getLogger("ACTIVITY");
public class SimpleActivity implements Activity {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final NBLabeledElement parentLabels;
protected ActivityDef activityDef;
private final List<AutoCloseable> closeables = new ArrayList<>();
@@ -80,15 +84,18 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
private ActivityInstrumentation activityInstrumentation;
private PrintWriter console;
private long startedAtMillis;
private int nameEnumerator = 0;
private int nameEnumerator;
private ErrorMetrics errorMetrics;
private NBErrorHandler errorHandler;
private ActivityMetricProgressMeter progressMeter;
private String workloadSource = "unspecified";
private final RunStateTally tally = new RunStateTally();
private final NBLabels labels;
public SimpleActivity(ActivityDef activityDef) {
public SimpleActivity(ActivityDef activityDef, NBLabeledElement parentLabels) {
labels = parentLabels.getLabels().and("activity",activityDef.getAlias());
this.activityDef = activityDef;
this.parentLabels = parentLabels;
if (activityDef.getAlias().equals(ActivityDef.DEFAULT_ALIAS)) {
Optional<String> workloadOpt = activityDef.getParams().getOptionalString(
"workload",
@@ -99,13 +106,14 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
} else {
activityDef.getParams().set("alias",
activityDef.getActivityType().toUpperCase(Locale.ROOT)
+ nameEnumerator++);
+ nameEnumerator);
nameEnumerator++;
}
}
}
public SimpleActivity(String activityDefString) {
this(ActivityDef.parseActivityDef(activityDefString));
public SimpleActivity(String activityDefString, NBLabeledElement parentLabels) {
this(ActivityDef.parseActivityDef(activityDefString),parentLabels);
}
@Override
@@ -114,7 +122,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
}
public synchronized NBErrorHandler getErrorHandler() {
if (errorHandler == null) {
if (null == this.errorHandler) {
errorHandler = new NBErrorHandler(
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
() -> getExceptionMetrics());
@@ -122,13 +130,15 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
return errorHandler;
}
@Override
public synchronized RunState getRunState() {
return runState;
}
@Override
public synchronized void setRunState(RunState runState) {
this.runState = runState;
if (runState == RunState.Running) {
if (RunState.Running == runState) {
this.startedAtMillis = System.currentTimeMillis();
}
}
@@ -194,7 +204,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
}
public String toString() {
return getAlias() + ":" + getRunState() + ":" + getRunStateTally().toString();
return getAlias() + ':' + this.runState + ':' + this.tally;
}
@Override
@@ -243,7 +253,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
@Override
public synchronized RateLimiter getCycleRateLimiter(Supplier<? extends RateLimiter> s) {
if (cycleLimiter == null) {
if (null == this.cycleLimiter) {
cycleLimiter = s.get();
}
return cycleLimiter;
@@ -261,7 +271,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
@Override
public synchronized RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> s) {
if (strideLimiter == null) {
if (null == this.strideLimiter) {
strideLimiter = s.get();
}
return strideLimiter;
@@ -275,7 +285,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
@Override
public Timer getResultTimer() {
return ActivityMetrics.timer(getActivityDef(), "result", getParams().getOptionalInteger("hdr_digits").orElse(4));
return ActivityMetrics.timer(this, "result", getParams().getOptionalInteger("hdr_digits").orElse(4));
}
@Override
@@ -285,7 +295,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
@Override
public synchronized RateLimiter getPhaseRateLimiter(Supplier<? extends RateLimiter> supplier) {
if (phaseLimiter == null) {
if (null == this.phaseLimiter) {
phaseLimiter = supplier.get();
}
return phaseLimiter;
@@ -293,7 +303,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
@Override
public synchronized ActivityInstrumentation getInstrumentation() {
if (activityInstrumentation == null) {
if (null == this.activityInstrumentation) {
activityInstrumentation = new CoreActivityInstrumentation(this);
}
return activityInstrumentation;
@@ -301,8 +311,8 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
@Override
public synchronized PrintWriter getConsoleOut() {
if (this.console == null) {
this.console = new PrintWriter(System.out);
if (null == console) {
this.console = new PrintWriter(System.out, false, StandardCharsets.UTF_8);
}
return this.console;
}
@@ -319,8 +329,8 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
@Override
public synchronized ErrorMetrics getExceptionMetrics() {
if (errorMetrics == null) {
errorMetrics = new ErrorMetrics(this.getActivityDef());
if (null == this.errorMetrics) {
errorMetrics = new ErrorMetrics(this);
}
return errorMetrics;
}
@@ -334,15 +344,15 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
activityDef.getParams().getOptionalNamedParameter("striderate")
.map(RateSpec::new)
.ifPresent(spec -> strideLimiter = RateLimiters.createOrUpdate(this.getActivityDef(), "strides", strideLimiter, spec));
.ifPresent(spec -> strideLimiter = RateLimiters.createOrUpdate(this, "strides", strideLimiter, spec));
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate")
.map(RateSpec::new).ifPresent(
spec -> cycleLimiter = RateLimiters.createOrUpdate(this.getActivityDef(), "cycles", cycleLimiter, spec));
spec -> cycleLimiter = RateLimiters.createOrUpdate(this, "cycles", cycleLimiter, spec));
activityDef.getParams().getOptionalNamedParameter("phaserate")
.map(RateSpec::new)
.ifPresent(spec -> phaseLimiter = RateLimiters.createOrUpdate(this.getActivityDef(), "phases", phaseLimiter, spec));
.ifPresent(spec -> phaseLimiter = RateLimiters.createOrUpdate(this, "phases", phaseLimiter, spec));
}
@@ -369,13 +379,13 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow());
getParams().setSilently("cycles", getParams().getOptionalString("stride").orElseThrow());
} else {
if (getActivityDef().getCycleCount() == 0) {
if (0 == activityDef.getCycleCount()) {
throw new RuntimeException(
"You specified cycles, but the range specified means zero cycles: " + getParams().get("cycles")
);
}
long stride = getParams().getOptionalLong("stride").orElseThrow();
long cycles = getActivityDef().getCycleCount();
long cycles = this.activityDef.getCycleCount();
if (cycles < stride) {
throw new RuntimeException(
"The specified cycles (" + cycles + ") are less than the stride (" + stride + "). This means there aren't enough cycles to cause a stride to be executed." +
@@ -384,25 +394,25 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
}
}
long cycleCount = getActivityDef().getCycleCount();
long stride = getActivityDef().getParams().getOptionalLong("stride").orElseThrow();
long cycleCount = this.activityDef.getCycleCount();
long stride = this.activityDef.getParams().getOptionalLong("stride").orElseThrow();
if (stride > 0 && (cycleCount % stride) != 0) {
if (0 < stride && 0 != cycleCount % stride) {
logger.warn(() -> "The stride does not evenly divide cycles. Only full strides will be executed," +
"leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ")");
"leaving some cycles unused. (stride=" + stride + ", cycles=" + cycleCount + ')');
}
Optional<String> threadSpec = activityDef.getParams().getOptionalString("threads");
if (threadSpec.isPresent()) {
String spec = threadSpec.get();
int processors = Runtime.getRuntime().availableProcessors();
if (spec.equalsIgnoreCase("auto")) {
if ("auto".equalsIgnoreCase(spec)) {
int threads = processors * 10;
if (threads > activityDef.getCycleCount()) {
threads = (int) activityDef.getCycleCount();
logger.info("setting threads to " + threads + " (auto) [10xCORES, cycle count limited]");
logger.info("setting threads to {} (auto) [10xCORES, cycle count limited]", threads);
} else {
logger.info("setting threads to " + threads + " (auto) [10xCORES]");
logger.info("setting threads to {} (auto) [10xCORES]", threads);
}
// activityDef.setThreads(threads);
activityDef.getParams().setSilently("threads", threads);
@@ -423,18 +433,15 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
+ ", you should have more cycles than threads.");
}
} else {
if (cycleCount > 1000) {
logger.warn(() -> "For testing at scale, it is highly recommended that you " +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
" more information.");
}
} else if (1000 < cycleCount) {
logger.warn(() -> "For testing at scale, it is highly recommended that you " +
"set threads to a value higher than the default of 1." +
" hint: you can use threads=auto for reasonable default, or" +
" consult the topic on threads with `help threads` for" +
" more information.");
}
if (activityDef.getCycleCount() > 0 && seq.getOps().size() == 0) {
if (0 < this.activityDef.getCycleCount() && 0 == seq.getOps().size()) {
throw new BasicError("You have configured a zero-length sequence and non-zero cycles. Tt is not possible to continue with this activity.");
}
}
@@ -443,7 +450,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
* Given a function that can create an op of type <O> from a CommandTemplate, generate
* an indexed sequence of ready to call operations.
*
* This method works almost exactly like the {@link #createOpSequenceFromCommands(Function, boolean)},
* This method works almost exactly like the ,
* except that it uses the {@link CommandTemplate} semantics, which are more general and allow
* for map-based specification of operations with bindings in each field.
*
@@ -491,12 +498,12 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
for (int i = 0; i < pops.size(); i++) {
long ratio = ratios.get(i);
ParsedOp pop = pops.get(i);
if (ratio == 0) {
logger.info(() -> "skipped mapping op '" + pop.getName() + "'");
if (0 == ratio) {
logger.info(() -> "skipped mapping op '" + pop.getName() + '\'');
continue;
}
String dryrunSpec = pop.takeStaticConfigOr("dryrun", "none");
boolean dryrun = dryrunSpec.equalsIgnoreCase("op");
boolean dryrun = "op".equalsIgnoreCase(dryrunSpec);
DriverAdapter adapter = adapters.get(i);
OpMapper opMapper = adapter.getOpMapper();
@@ -512,8 +519,8 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
// }
planner.addOp((OpDispenser<? extends O>) dispenser, ratio);
}
if (dryrunCount > 0) {
logger.warn("initialized " + dryrunCount + " op templates for dry run only. These ops will be synthesized for each cycle, but will not be executed.");
if (0 < dryrunCount) {
logger.warn("initialized {} op templates for dry run only. These ops will be synthesized for each cycle, but will not be executed.", dryrunCount);
}
@@ -533,7 +540,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
List<Function<Map<String, Object>, Map<String, Object>>> parsers,
boolean strict
) {
Function<OpTemplate, ParsedOp> f = t -> new ParsedOp(t, cfg, parsers);
Function<OpTemplate, ParsedOp> f = t -> new ParsedOp(t, cfg, parsers, this);
Function<OpTemplate, OpDispenser<? extends O>> opTemplateOFunction = f.andThen(opinit);
return createOpSequence(opTemplateOFunction, strict, Optional.empty());
@@ -541,7 +548,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
protected List<ParsedOp> loadParsedOps(NBConfiguration cfg, Optional<DriverAdapter> defaultAdapter) {
List<ParsedOp> parsedOps = loadOpTemplates(defaultAdapter).stream().map(
ot -> new ParsedOp(ot, cfg, List.of())
ot -> new ParsedOp(ot, cfg, List.of(), this)
).toList();
return parsedOps;
}
@@ -555,35 +562,35 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
List<OpTemplate> unfilteredOps = opsDocList.getOps();
List<OpTemplate> filteredOps = opsDocList.getOps(tagfilter);
if (filteredOps.size() == 0) {
if (unfilteredOps.size() > 0) { // There were no ops, and it was because they were all filtered out
if (0 == filteredOps.size()) {
// There were no ops, and it *wasn't* because they were all filtered out.
// In this case, let's try to synthesize the ops as long as at least a default driver was provided
// But if there were no ops, and there was no default driver provided, we can't continue
// There were no ops, and it was because they were all filtered out
if (0 < unfilteredOps.size()) {
throw new BasicError("There were no active op templates with tag filter '"
+ tagfilter + "', since all " + unfilteredOps.size() + " were filtered out.");
} else {
// There were no ops, and it *wasn't* because they were all filtered out.
// In this case, let's try to synthesize the ops as long as at least a default driver was provided
if (defaultDriverAdapter.isPresent() && defaultDriverAdapter.get() instanceof SyntheticOpTemplateProvider sotp) {
filteredOps = sotp.getSyntheticOpTemplates(opsDocList, getActivityDef().getParams());
Objects.requireNonNull(filteredOps);
if (filteredOps.size() == 0) {
throw new BasicError("Attempted to create synthetic ops from driver '" + defaultDriverAdapter.get().getAdapterName() + "'" +
" but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
}
} else { // But if there were no ops, and there was no default driver provided, we can't continue
throw new BasicError("""
No op templates were provided. You must provide one of these activity parameters:
1) workload=some.yaml
2) op='inline template'
3) driver=stdout (or any other drive that can synthesize ops)""");
}
}
if (filteredOps.size() == 0) {
throw new BasicError("There were no active op templates with tag filter '" + tagfilter + "'");
if (defaultDriverAdapter.isPresent() && defaultDriverAdapter.get() instanceof SyntheticOpTemplateProvider sotp) {
filteredOps = sotp.getSyntheticOpTemplates(opsDocList, this.activityDef.getParams());
Objects.requireNonNull(filteredOps);
if (0 == filteredOps.size()) {
throw new BasicError("Attempted to create synthetic ops from driver '" + defaultDriverAdapter.get().getAdapterName() + '\'' +
" but no ops were created. You must provide either a workload or an op parameter. Activities require op templates.");
}
} else {
throw new BasicError("""
No op templates were provided. You must provide one of these activity parameters:
1) workload=some.yaml
2) op='inline template'
3) driver=stdout (or any other drive that can synthesize ops)""");
}
if (0 == filteredOps.size()) {
throw new BasicError("There were no active op templates with tag filter '" + tagfilter + '\'');
}
}
if (filteredOps.size() == 0) {
if (0 == filteredOps.size()) {
throw new OpConfigError("No op templates found. You must provide either workload=... or op=..., or use " +
"a default driver (driver=___). This includes " +
ServiceLoader.load(DriverAdapter.class).stream()
@@ -670,7 +677,8 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
if (stmt.isPresent()) {
workloadSource = "commandline:" + stmt.get();
return OpsLoader.loadString(stmt.get(), OpTemplateFormat.inline, activityDef.getParams(), null);
} else if (op_yaml_loc.isPresent()) {
}
if (op_yaml_loc.isPresent()) {
workloadSource = "yaml:" + op_yaml_loc.get();
return OpsLoader.loadPath(op_yaml_loc.get(), activityDef.getParams(), "activities");
}
@@ -685,7 +693,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
@Override
public synchronized ProgressMeterDisplay getProgressMeter() {
if (progressMeter == null) {
if (null == this.progressMeter) {
this.progressMeter = new ActivityMetricProgressMeter(this);
}
return this.progressMeter;
@@ -700,7 +708,7 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
*/
@Override
public int getMaxTries() {
return getActivityDef().getParams().getOptionalInteger("maxtries").orElse(10);
return this.activityDef.getParams().getOptionalInteger("maxtries").orElse(10);
}
@Override
@@ -708,9 +716,8 @@ public class SimpleActivity implements Activity, ProgressCapable, ActivityDefObs
return tally;
}
@Override
public String getName() {
return this.activityDef.getAlias();
public NBLabels getLabels() {
return this.labels;
}
}

View File

@@ -16,6 +16,8 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.config.standard.*;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.errors.BasicError;
@@ -46,15 +48,15 @@ import java.util.concurrent.ConcurrentHashMap;
* @param <S> The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
*/
public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider {
private final static Logger logger = LogManager.getLogger("ACTIVITY");
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence<OpDispenser<? extends Op>> sequence;
private final NBConfigModel yamlmodel;
private final ConcurrentHashMap<String, DriverAdapter> adapters = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, OpMapper<Op>> mappers = new ConcurrentHashMap<>();
public StandardActivity(ActivityDef activityDef) {
super(activityDef);
public StandardActivity(ActivityDef activityDef, NBLabeledElement parentLabels) {
super(activityDef, parentLabels);
OpsDocList workload;
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
@@ -72,7 +74,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
.flatMap(s -> ServiceSelector.of(s, adapterLoader).get());
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + "'");
throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + '\'');
}
// HERE, op templates are loaded before drivers are loaded
@@ -85,7 +87,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
Optional<String> defaultDriverOption = activityDef.getParams().getOptionalString("driver");
for (OpTemplate ot : opTemplates) {
ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of());
ParsedOp incompleteOpDef = new ParsedOp(ot, NBConfiguration.empty(), List.of(), this);
String driverName = incompleteOpDef.takeOptionalStaticValue("driver", String.class)
.or(() -> incompleteOpDef.takeOptionalStaticValue("type",String.class))
.or(() -> defaultDriverOption)
@@ -97,7 +99,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
if (!adapters.containsKey(driverName)) {
DriverAdapter adapter = ServiceSelector.of(driverName, adapterLoader).get().orElseThrow(
() -> new OpConfigError("Unable to load driver adapter for name '" + driverName + "'")
() -> new OpConfigError("Unable to load driver adapter for name '" + driverName + '\'')
);
NBConfigModel combinedModel = yamlmodel;
@@ -119,15 +121,15 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
DriverAdapter adapter = adapters.get(driverName);
adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()));
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
pops.add(pop);
}
if (defaultDriverOption.isPresent()) {
long matchingDefault = mappers.keySet().stream().filter(n -> n.equals(defaultDriverOption.get())).count();
if (matchingDefault==0) {
logger.warn("All op templates used a different driver than the default '" + defaultDriverOption.get()+"'");
if (0 == matchingDefault) {
logger.warn("All op templates used a different driver than the default '{}'", defaultDriverOption.get());
}
}
@@ -137,9 +139,8 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
} catch (Exception e) {
if (e instanceof OpConfigError) {
throw e;
} else {
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
}
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
}
}
@@ -212,4 +213,9 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
});
}
}
@Override
public NBLabels getLabels() {
return super.getLabels();
}
}

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.engine.api.activityimpl.uniform;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.NBReconfigurable;
@@ -35,55 +36,52 @@ import java.util.Optional;
public class StandardActivityType<A extends StandardActivity<?,?>> extends SimpleActivity implements ActivityType<A> {
private final static Logger logger = LogManager.getLogger("ACTIVITY");
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final Map<String,DriverAdapter> adapters = new HashMap<>();
public StandardActivityType(DriverAdapter<?,?> adapter, ActivityDef activityDef) {
public StandardActivityType(final DriverAdapter<?,?> adapter, final ActivityDef activityDef, final NBLabeledElement parentLabels) {
super(activityDef
.deprecate("type","driver")
.deprecate("yaml", "workload")
.deprecate("yaml", "workload"),
parentLabels
);
this.adapters.put(adapter.getAdapterName(),adapter);
if (adapter instanceof ActivityDefAware) {
((ActivityDefAware) adapter).setActivityDef(activityDef);
}
adapters.put(adapter.getAdapterName(),adapter);
if (adapter instanceof ActivityDefAware) ((ActivityDefAware) adapter).setActivityDef(activityDef);
}
public StandardActivityType(ActivityDef activityDef) {
super(activityDef);
public StandardActivityType(final ActivityDef activityDef, final NBLabeledElement parentLabels) {
super(activityDef, parentLabels);
}
@Override
public A getActivity(ActivityDef activityDef) {
if (activityDef.getParams().getOptionalString("async").isPresent()) {
public A getActivity(final ActivityDef activityDef, final NBLabeledElement parentLabels) {
if (activityDef.getParams().getOptionalString("async").isPresent())
throw new RuntimeException("This driver does not support async mode yet.");
}
return (A) new StandardActivity(activityDef);
return (A) new StandardActivity(activityDef, parentLabels);
}
@Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
public synchronized void onActivityDefUpdate(final ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
for (DriverAdapter adapter : adapters.values()) {
for (final DriverAdapter adapter : this.adapters.values())
if (adapter instanceof NBReconfigurable reconfigurable) {
NBConfigModel cfgModel = reconfigurable.getReconfigModel();
Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
final Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (op_yaml_loc.isPresent()) {
Map<String,Object> disposable = new LinkedHashMap<>(activityDef.getParams());
OpsDocList workload = OpsLoader.loadPath(op_yaml_loc.get(), disposable, "activities");
cfgModel=cfgModel.add(workload.getConfigModel());
final Map<String, Object> disposable = new LinkedHashMap<>(activityDef.getParams());
final OpsDocList workload = OpsLoader.loadPath(op_yaml_loc.get(), disposable, "activities");
cfgModel = cfgModel.add(workload.getConfigModel());
}
NBConfiguration cfg = cfgModel.apply(activityDef.getParams());
final NBConfiguration cfg = cfgModel.apply(activityDef.getParams());
reconfigurable.applyReconfig(cfg);
}
}
}
@Override
public ActionDispenser getActionDispenser(A activity) {
public ActionDispenser getActionDispenser(final A activity) {
return new StandardActionDispenser(activity);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,11 +17,10 @@
package io.nosqlbench.engine.api.extensions;
import com.codahale.metrics.MetricRegistry;
import io.nosqlbench.api.config.LabeledScenarioContext;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.Logger;
import javax.script.ScriptContext;
/**
* Any implementation of a SandboxExtension that is found in the runtime
* can be automatically loaded into the scenario scripting sandbox.
@@ -44,13 +43,13 @@ public interface ScriptingPluginInfo<T> {
* @param scriptContext The scripting context object, useful for interacting with the sandbox directly
* @return a new instance of an extension. The extension is given a logger if it desires.
*/
T getExtensionObject(Logger logger, MetricRegistry metricRegistry, ScriptContext scriptContext);
T getExtensionObject(Logger logger, MetricRegistry metricRegistry, LabeledScenarioContext scriptContext);
/**
* @return a simple name at the root of the variable namespace to anchor this extension.
*/
default String getBaseVariableName() {
return getClass().getAnnotation(Service.class).selector();
return this.getClass().getAnnotation(Service.class).selector();
}
/**

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
package io.nosqlbench.engine.api.metrics;
import com.codahale.metrics.Counter;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import java.util.ArrayList;
@@ -30,28 +30,26 @@ import java.util.concurrent.ConcurrentHashMap;
public class ExceptionCountMetrics {
private final ConcurrentHashMap<String, Counter> counters = new ConcurrentHashMap<>();
private final Counter allerrors;
private final ActivityDef activityDef;
private final NBLabeledElement parentLabels;
public ExceptionCountMetrics(ActivityDef activityDef) {
this.activityDef = activityDef;
allerrors=ActivityMetrics.counter(activityDef, "errorcounts.ALL");
public ExceptionCountMetrics(final NBLabeledElement parentLabels) {
this.parentLabels = parentLabels;
this.allerrors =ActivityMetrics.counter(parentLabels, "errorcounts.ALL");
}
public void count(String name) {
Counter c = counters.get(name);
if (c == null) {
synchronized (counters) {
c = counters.computeIfAbsent(
name,
k -> ActivityMetrics.counter(activityDef, "errorcounts." + name)
);
}
public void count(final String name) {
Counter c = this.counters.get(name);
if (null == c) synchronized (this.counters) {
c = this.counters.computeIfAbsent(
name,
k -> ActivityMetrics.counter(this.parentLabels, "errorcounts." + name)
);
}
c.inc();
allerrors.inc();
this.allerrors.inc();
}
public List<Counter> getCounters() {
return new ArrayList<>(counters.values());
return new ArrayList<>(this.counters.values());
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package io.nosqlbench.engine.api.metrics;
import com.codahale.metrics.Histogram;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
@@ -32,29 +33,29 @@ import java.util.concurrent.ConcurrentHashMap;
public class ExceptionHistoMetrics {
private final ConcurrentHashMap<String, Histogram> histos = new ConcurrentHashMap<>();
private final Histogram allerrors;
private final NBLabeledElement parentLabels;
private final ActivityDef activityDef;
public ExceptionHistoMetrics(ActivityDef activityDef) {
public ExceptionHistoMetrics(final NBLabeledElement parentLabels, final ActivityDef activityDef) {
this.parentLabels = parentLabels;
this.activityDef = activityDef;
allerrors = ActivityMetrics.histogram(activityDef, "errorhistos.ALL", activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4));
this.allerrors = ActivityMetrics.histogram(parentLabels, "errorhistos.ALL", activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4));
}
public void update(String name, long magnitude) {
Histogram h = histos.get(name);
if (h == null) {
synchronized (histos) {
h = histos.computeIfAbsent(
name,
k -> ActivityMetrics.histogram(activityDef, "errorhistos." + name, activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
);
}
public void update(final String name, final long magnitude) {
Histogram h = this.histos.get(name);
if (null == h) synchronized (this.histos) {
h = this.histos.computeIfAbsent(
name,
k -> ActivityMetrics.histogram(this.parentLabels, "errorhistos." + name, this.activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
);
}
h.update(magnitude);
allerrors.update(magnitude);
this.allerrors.update(magnitude);
}
public List<Histogram> getHistograms() {
return new ArrayList<>(histos.values());
return new ArrayList<>(this.histos.values());
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,7 @@
package io.nosqlbench.engine.api.metrics;
import com.codahale.metrics.Meter;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
import java.util.ArrayList;
@@ -30,28 +30,26 @@ import java.util.concurrent.ConcurrentHashMap;
public class ExceptionMeterMetrics {
private final ConcurrentHashMap<String, Meter> meters = new ConcurrentHashMap<>();
private final Meter allerrors;
private final ActivityDef activityDef;
private final NBLabeledElement parentLabels;
public ExceptionMeterMetrics(ActivityDef activityDef) {
this.activityDef = activityDef;
allerrors = ActivityMetrics.meter(activityDef, "errormeters.ALL");
public ExceptionMeterMetrics(final NBLabeledElement parentLabels) {
this.parentLabels = parentLabels;
this.allerrors = ActivityMetrics.meter(parentLabels, "errormeters.ALL");
}
public void mark(String name) {
Meter c = meters.get(name);
if (c == null) {
synchronized (meters) {
c = meters.computeIfAbsent(
name,
k -> ActivityMetrics.meter(activityDef, "errormeters." + name)
);
}
public void mark(final String name) {
Meter c = this.meters.get(name);
if (null == c) synchronized (this.meters) {
c = this.meters.computeIfAbsent(
name,
k -> ActivityMetrics.meter(this.parentLabels, "errormeters." + name)
);
}
c.mark();
allerrors.mark();
this.allerrors.mark();
}
public List<Meter> getMeters() {
return new ArrayList<>(meters.values());
return new ArrayList<>(this.meters.values());
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,6 +17,7 @@
package io.nosqlbench.engine.api.metrics;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.metrics.ActivityMetrics;
@@ -32,31 +33,32 @@ public class ExceptionTimerMetrics {
private final ConcurrentHashMap<String, Timer> timers = new ConcurrentHashMap<>();
private final Timer allerrors;
private final ActivityDef activityDef;
private final NBLabeledElement parentLabels;
public ExceptionTimerMetrics(ActivityDef activityDef) {
public ExceptionTimerMetrics(final NBLabeledElement parentLabels, final ActivityDef activityDef) {
this.activityDef = activityDef;
allerrors = ActivityMetrics.timer(
activityDef,
this.parentLabels = parentLabels;
this.allerrors = ActivityMetrics.timer(
parentLabels,
"errortimers.ALL",
activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4)
);
}
public void update(String name, long nanosDuration) {
Timer timer = timers.get(name);
if (timer == null) {
synchronized (timers) {
timer = timers.computeIfAbsent(
name,
k -> ActivityMetrics.timer(activityDef, "errortimers." + name, activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
);
}
public void update(final String name, final long nanosDuration) {
Timer timer = this.timers.get(name);
if (null == timer) synchronized (this.timers) {
timer = this.timers.computeIfAbsent(
name,
k -> ActivityMetrics.timer(this.parentLabels, "errortimers." + name, this.activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
);
}
timer.update(nanosDuration, TimeUnit.NANOSECONDS);
allerrors.update(nanosDuration, TimeUnit.NANOSECONDS);
this.allerrors.update(nanosDuration, TimeUnit.NANOSECONDS);
}
public List<Timer> getTimers() {
return new ArrayList<>(timers.values());
return new ArrayList<>(this.timers.values());
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,11 +20,12 @@ import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.handlers.CountErrorHandler;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.handlers.CounterErrorHandler;
import io.nosqlbench.util.NBMock;
import io.nosqlbench.util.NBMock.LogAppender;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;
@@ -43,28 +44,28 @@ class NBErrorHandlerTest {
@Test
void testNullConfig() {
ErrorMetrics errorMetrics = new ErrorMetrics(ActivityDef.parseActivityDef("alias=testalias_stop"));
NBErrorHandler errhandler = new NBErrorHandler(() -> "stop", () -> errorMetrics);
final ErrorMetrics errorMetrics = new ErrorMetrics(NBLabeledElement.forKV("activity","testalias_stop"));
final NBErrorHandler errhandler = new NBErrorHandler(() -> "stop", () -> errorMetrics);
assertThatExceptionOfType(RuntimeException.class)
.isThrownBy(() -> errhandler.handleError(runtimeException, 1, 2));
.isThrownBy(() -> errhandler.handleError(this.runtimeException, 1, 2));
}
@Test
void testMultipleWithRetry() {
ErrorMetrics errorMetrics = new ErrorMetrics(ActivityDef.parseActivityDef("alias=testalias_wr"));
NBErrorHandler eh = new NBErrorHandler(() -> "warn,retry", () -> errorMetrics);
ErrorDetail detail = eh.handleError(runtimeException, 1, 2);
final ErrorMetrics errorMetrics = new ErrorMetrics(NBLabeledElement.forKV("activity","testalias_wr"));
final NBErrorHandler eh = new NBErrorHandler(() -> "warn,retry", () -> errorMetrics);
final ErrorDetail detail = eh.handleError(this.runtimeException, 1, 2);
assertThat(detail.isRetryable()).isTrue();
}
@Test
void testWarnErrorHandler() {
Logger logger = (Logger) LogManager.getLogger("ERRORS");
NBMock.LogAppender appender = NBMock.registerTestLogger(ERROR_HANDLER_APPENDER_NAME, logger, Level.WARN);
final Logger logger = (Logger) LogManager.getLogger("ERRORS");
final LogAppender appender = NBMock.registerTestLogger(NBErrorHandlerTest.ERROR_HANDLER_APPENDER_NAME, logger, Level.WARN);
ErrorMetrics errorMetrics = new ErrorMetrics(ActivityDef.parseActivityDef("alias=testalias_warn"));
NBErrorHandler eh = new NBErrorHandler(() -> "warn", () -> errorMetrics);
ErrorDetail detail = eh.handleError(runtimeException, 1, 2);
final ErrorMetrics errorMetrics = new ErrorMetrics(NBLabeledElement.forKV("activity","testalias_warn"));
final NBErrorHandler eh = new NBErrorHandler(() -> "warn", () -> errorMetrics);
final ErrorDetail detail = eh.handleError(this.runtimeException, 1, 2);
logger.getContext().stop(); // force any async appenders to flush
logger.getContext().start(); // resume processing
@@ -77,34 +78,34 @@ class NBErrorHandlerTest {
@Test
void testHistogramErrorHandler() {
ErrorMetrics errorMetrics = new ErrorMetrics(ActivityDef.parseActivityDef("alias=testalias_histos"));
NBErrorHandler eh = new NBErrorHandler(() -> "histogram", () -> errorMetrics);
ErrorDetail detail = eh.handleError(runtimeException, 1, 2);
final ErrorMetrics errorMetrics = new ErrorMetrics(NBLabeledElement.forKV("activity","testalias_histos"));
final NBErrorHandler eh = new NBErrorHandler(() -> "histogram", () -> errorMetrics);
final ErrorDetail detail = eh.handleError(this.runtimeException, 1, 2);
assertThat(detail.isRetryable()).isFalse();
List<Histogram> histograms = errorMetrics.getExceptionHistoMetrics().getHistograms();
final List<Histogram> histograms = errorMetrics.getExceptionHistoMetrics().getHistograms();
assertThat(histograms).hasSize(1);
}
@Test
void testTimerErrorHandler() {
ErrorMetrics errorMetrics = new ErrorMetrics(ActivityDef.parseActivityDef("alias=testalias_timers"));
NBErrorHandler eh = new NBErrorHandler(() -> "timer", () -> errorMetrics);
ErrorDetail detail = eh.handleError(runtimeException, 1, 2);
final ErrorMetrics errorMetrics = new ErrorMetrics(NBLabeledElement.forKV("activity","testalias_timers"));
final NBErrorHandler eh = new NBErrorHandler(() -> "timer", () -> errorMetrics);
final ErrorDetail detail = eh.handleError(this.runtimeException, 1, 2);
assertThat(detail.isRetryable()).isFalse();
List<Timer> histograms = errorMetrics.getExceptionTimerMetrics().getTimers();
final List<Timer> histograms = errorMetrics.getExceptionTimerMetrics().getTimers();
assertThat(histograms).hasSize(1);
}
@Test
void testCounterErrorHandler() {
Logger logger = (Logger) LogManager.getLogger(CounterErrorHandler.class);
NBMock.LogAppender appender = NBMock.registerTestLogger(ERROR_HANDLER_APPENDER_NAME, logger, Level.INFO);
final Logger logger = (Logger) LogManager.getLogger(CounterErrorHandler.class);
final LogAppender appender = NBMock.registerTestLogger(NBErrorHandlerTest.ERROR_HANDLER_APPENDER_NAME, logger, Level.INFO);
ErrorMetrics errorMetrics = new ErrorMetrics(ActivityDef.parseActivityDef("alias=testalias_counters"));
NBErrorHandler eh = new NBErrorHandler(() -> "counter", () -> errorMetrics);
ErrorDetail detail = eh.handleError(runtimeException, 1, 2);
final ErrorMetrics errorMetrics = new ErrorMetrics(NBLabeledElement.forKV("activity","testalias_counters"));
final NBErrorHandler eh = new NBErrorHandler(() -> "counter", () -> errorMetrics);
final ErrorDetail detail = eh.handleError(this.runtimeException, 1, 2);
assertThat(detail.isRetryable()).isFalse();
List<Counter> histograms = errorMetrics.getExceptionCountMetrics().getCounters();
final List<Counter> histograms = errorMetrics.getExceptionCountMetrics().getCounters();
assertThat(histograms).hasSize(1);
logger.getContext().stop(); // force any async appenders to flush
@@ -116,14 +117,14 @@ class NBErrorHandlerTest {
@Test
void testCountErrorHandler() {
Logger logger = (Logger) LogManager.getLogger(CountErrorHandler.class);
NBMock.LogAppender appender = NBMock.registerTestLogger(ERROR_HANDLER_APPENDER_NAME, logger, Level.WARN);
final Logger logger = (Logger) LogManager.getLogger(CountErrorHandler.class);
final LogAppender appender = NBMock.registerTestLogger(NBErrorHandlerTest.ERROR_HANDLER_APPENDER_NAME, logger, Level.WARN);
ErrorMetrics errorMetrics = new ErrorMetrics(ActivityDef.parseActivityDef("alias=testalias_count"));
NBErrorHandler eh = new NBErrorHandler(() -> "count", () -> errorMetrics);
ErrorDetail detail = eh.handleError(runtimeException, 1, 2);
final ErrorMetrics errorMetrics = new ErrorMetrics(NBLabeledElement.forKV("activity","testalias_count"));
final NBErrorHandler eh = new NBErrorHandler(() -> "count", () -> errorMetrics);
final ErrorDetail detail = eh.handleError(this.runtimeException, 1, 2);
assertThat(detail.isRetryable()).isFalse();
List<Counter> histograms = errorMetrics.getExceptionCountMetrics().getCounters();
final List<Counter> histograms = errorMetrics.getExceptionCountMetrics().getCounters();
assertThat(histograms).hasSize(1);
logger.getContext().stop(); // force any async appenders to flush
@@ -136,19 +137,19 @@ class NBErrorHandlerTest {
@Test
void testMeterErrorHandler() {
ErrorMetrics errorMetrics = new ErrorMetrics(ActivityDef.parseActivityDef("alias=testalias_meters"));
NBErrorHandler eh = new NBErrorHandler(() -> "meter", () -> errorMetrics);
ErrorDetail detail = eh.handleError(runtimeException, 1, 2);
final ErrorMetrics errorMetrics = new ErrorMetrics(NBLabeledElement.forKV("activity","testalias_meters"));
final NBErrorHandler eh = new NBErrorHandler(() -> "meter", () -> errorMetrics);
final ErrorDetail detail = eh.handleError(this.runtimeException, 1, 2);
assertThat(detail.isRetryable()).isFalse();
List<Meter> histograms = errorMetrics.getExceptionMeterMetrics().getMeters();
final List<Meter> histograms = errorMetrics.getExceptionMeterMetrics().getMeters();
assertThat(histograms).hasSize(1);
}
@Test
void testCodeShorthand() {
ErrorMetrics errorMetrics = new ErrorMetrics(ActivityDef.parseActivityDef("alias=testalias_meters"));
NBErrorHandler eh = new NBErrorHandler(() -> "handler=code code=42", () -> errorMetrics);
ErrorDetail detail = eh.handleError(runtimeException, 1, 2);
final ErrorMetrics errorMetrics = new ErrorMetrics(NBLabeledElement.forKV("activity","testalias_meters"));
final NBErrorHandler eh = new NBErrorHandler(() -> "handler=code code=42", () -> errorMetrics);
final ErrorDetail detail = eh.handleError(this.runtimeException, 1, 2);
assertThat(detail.isRetryable()).isFalse();
assertThat(detail.resultCode).isEqualTo(42);
}
@@ -156,8 +157,8 @@ class NBErrorHandlerTest {
@Test
void testErrorLogAppender() {
Logger logger = (Logger) LogManager.getLogger(ErrorHandler.class);
NBMock.LogAppender appender = NBMock.registerTestLogger(ERROR_HANDLER_APPENDER_NAME, logger, Level.DEBUG);
final Logger logger = (Logger) LogManager.getLogger(ErrorHandler.class);
final LogAppender appender = NBMock.registerTestLogger(NBErrorHandlerTest.ERROR_HANDLER_APPENDER_NAME, logger, Level.DEBUG);
logger.debug("NBErrorHandler is cool.");
logger.debug("I second that.");
@@ -165,7 +166,7 @@ class NBErrorHandlerTest {
logger.getContext().stop(); // force any async appenders to flush
logger.getContext().start(); // resume processing
List<String> entries = appender.getEntries();
final List<String> entries = appender.getEntries();
assertThat(entries).hasSize(2);
assertThat(appender.getFirstEntry()).isEqualTo("NBErrorHandler is cool.");
assertThat(entries.get(1)).isEqualTo("I second that.");

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,11 +16,13 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.metrics.DeltaHdrHistogramReservoir;
import io.nosqlbench.api.testutils.Bounds;
import io.nosqlbench.api.testutils.Perf;
import io.nosqlbench.api.testutils.Result;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -42,39 +44,39 @@ public class RateLimiterPerfTestMethods {
// return perf;
// }
public Result systemTimeOverhead(RateLimiter rl) {
Bounds bounds = new Bounds(1000, 2);
Perf perf = new Perf("nanotime");
public Result systemTimeOverhead(final RateLimiter rl) {
final Bounds bounds = new Bounds(1000, 2);
final Perf perf = new Perf("nanotime");
while (!perf.isConverged(Result::getOpsPerSec, 0.01d, 3)) {
System.out.println("testing with opcount=" + bounds.getNextValue());
long start = System.nanoTime();
final long start = System.nanoTime();
for (long iter = 0; iter < bounds.getValue(); iter++) {
long result = System.nanoTime();
final long result = System.nanoTime();
}
long end = System.nanoTime();
final long end = System.nanoTime();
perf.add("nanotime/" + bounds.getValue(), start, end, bounds.getValue());
}
double[] deltas = perf.getDeltas(Result::getOpsPerSec);
final double[] deltas = perf.getDeltas(Result::getOpsPerSec);
return perf.getLastResult();
}
public Result rateLimiterSingleThreadedConvergence(Function<RateSpec, RateLimiter> rlf, RateSpec rs, long startingCycles, double margin) {
public Result rateLimiterSingleThreadedConvergence(final Function<RateSpec, RateLimiter> rlf, final RateSpec rs, final long startingCycles, final double margin) {
//rl.applyRateSpec(rl.getRateSpec().withOpsPerSecond(1E9));
Bounds bounds = new Bounds(startingCycles, 2);
Perf perf = new Perf("nanotime");
final Bounds bounds = new Bounds(startingCycles, 2);
final Perf perf = new Perf("nanotime");
while (!perf.isConverged(Result::getOpsPerSec, margin, 3)) {
System.out.println("testing with opcount=" + bounds.getNextValue() + " spec=" + rs);
RateLimiter rl = rlf.apply(rs);
long start = System.nanoTime();
final RateLimiter rl = rlf.apply(rs);
final long start = System.nanoTime();
for (long iter = 0; iter < bounds.getValue(); iter++) {
long result = rl.maybeWaitForOp();
final long result = rl.maybeWaitForOp();
}
long end = System.nanoTime();
final long end = System.nanoTime();
perf.add("rl/" + bounds.getValue(), start, end, bounds.getValue());
System.out.println(perf.getLastResult());
@@ -99,28 +101,28 @@ public class RateLimiterPerfTestMethods {
* @param count_rate_division_clientrate
* @return
*/
long[] testRateChanges(RateLimiter rl, int... count_rate_division_clientrate) {
long[] testRateChanges(final RateLimiter rl, final int... count_rate_division_clientrate) {
System.out.println("Running " + Thread.currentThread().getStackTrace()[1].getMethodName());
List<Long> results = new ArrayList<>();
final List<Long> results = new ArrayList<>();
for (int idx = 0; idx < count_rate_division_clientrate.length; idx += 4) {
int count = count_rate_division_clientrate[idx];
int rate = count_rate_division_clientrate[idx + 1];
int divisions = count_rate_division_clientrate[idx + 2];
int clientrate = count_rate_division_clientrate[idx + 3];
long clientnanos = (long) (1_000_000_000.0D / clientrate);
final int count = count_rate_division_clientrate[idx];
final int rate = count_rate_division_clientrate[idx + 1];
final int divisions = count_rate_division_clientrate[idx + 2];
final int clientrate = count_rate_division_clientrate[idx + 3];
final long clientnanos = (long) (1_000_000_000.0D / clientrate);
if (rl instanceof DiagUpdateRate) {
((DiagUpdateRate) rl).setDiagModulo(count / divisions);
System.out.println("updating every " + (count / divisions) + " calls (" + count + "/" + divisions + ")");
System.out.println("updating every " + count / divisions + " calls (" + count + '/' + divisions + ')');
}
System.out.println("count=" + count + ", getOpsPerSec=" + rate + ", div=" + divisions + ", clientrate=" + clientrate);
System.out.println("client nanos: " + clientnanos);
long startAt = System.nanoTime();
final long startAt = System.nanoTime();
rl.applyRateSpec(rl.getRateSpec().withOpsPerSecond(rate));
int perDivision = count / divisions;
final int perDivision = count / divisions;
long divDelay = 0L;
for (int div = 0; div < divisions; div++) {
long then = System.nanoTime();
@@ -134,25 +136,25 @@ public class RateLimiterPerfTestMethods {
results.add(divDelay);
}
long endAt = System.nanoTime();
double duration = (endAt - startAt) / 1000000000.0d;
double acqops = (count / duration);
final long endAt = System.nanoTime();
final double duration = (endAt - startAt) / 1000000000.0d;
final double acqops = count / duration;
System.out.println(rl);
System.out.println(ANSI_Blue +
String.format(
"spec: %s\n count: %9d, duration %.5fS, acquires/s %.3f, nanos/op: %f\n delay: %d (%.5fS)",
rl.getRateSpec(),
count, duration, acqops, (1_000_000_000.0d / acqops), divDelay, (divDelay / 1_000_000_000.0d)) +
ANSI_Reset);
String.format(
"spec: %s\n count: %9d, duration %.5fS, acquires/s %.3f, nanos/op: %f\n delay: %d (%.5fS)",
rl.getRateSpec(),
count, duration, acqops, 1_000_000_000.0d / acqops, divDelay, divDelay / 1_000_000_000.0d) +
ANSI_Reset);
}
long[] delays = results.stream().mapToLong(Long::longValue).toArray();
final long[] delays = results.stream().mapToLong(Long::longValue).toArray();
String delaySummary = Arrays.stream(delays).mapToDouble(d -> (double) d / 1_000_000_000.0D).mapToObj(d -> String.format("%.3f", d))
.collect(Collectors.joining(","));
final String delaySummary = Arrays.stream(delays).mapToDouble(d -> d / 1_000_000_000.0D).mapToObj(d -> String.format("%.3f", d))
.collect(Collectors.joining(","));
System.out.println("delays in seconds:\n" + delaySummary);
System.out.println("delays in ns:\n" + Arrays.toString(delays));
@@ -160,12 +162,12 @@ public class RateLimiterPerfTestMethods {
}
public Result rateLimiterContendedConvergence(int threads, Function<RateSpec, RateLimiter> rlFunc, RateSpec rateSpec, int initialIterations, double margin) {
Bounds bounds = new Bounds(initialIterations, 2);
Perf perf = new Perf("contended with " + threads + " threads");
public Result rateLimiterContendedConvergence(final int threads, final Function<RateSpec, RateLimiter> rlFunc, final RateSpec rateSpec, final int initialIterations, final double margin) {
final Bounds bounds = new Bounds(initialIterations, 2);
final Perf perf = new Perf("contended with " + threads + " threads");
while (!perf.isConverged(Result::getOpsPerSec, margin, 3)) {
Perf delegateperf = testRateLimiterMultiThreadedContention(rlFunc, rateSpec, initialIterations, threads);
final Perf delegateperf = this.testRateLimiterMultiThreadedContention(rlFunc, rateSpec, initialIterations, threads);
perf.add(delegateperf.getLastResult());
}
return perf.getLastResult();
@@ -175,48 +177,42 @@ public class RateLimiterPerfTestMethods {
* This a low-overhead test for multi-threaded access to the same getOpsPerSec limiter. It calculates the
* effective concurrent getOpsPerSec under atomic contention.
*/
public Perf testRateLimiterMultiThreadedContention(Function<RateSpec, RateLimiter> rlFunc, RateSpec spec, long iterations, int threadCount) {
public Perf testRateLimiterMultiThreadedContention(final Function<RateSpec, RateLimiter> rlFunc, final RateSpec spec, final long iterations, final int threadCount) {
System.out.println("Running " + Thread.currentThread().getStackTrace()[1].getMethodName());
RateLimiter rl = rlFunc.apply(spec);
double rate = spec.getRate();
int iterationsPerThread = (int) (iterations / threadCount);
if (iterationsPerThread >= Integer.MAX_VALUE) {
throw new RuntimeException("iterations per thread too high with (count,threads)=(" + iterations + "," + threadCount);
}
RateLimiterPerfTestMethods.TestExceptionHandler errorhandler = new RateLimiterPerfTestMethods.TestExceptionHandler();
RateLimiterPerfTestMethods.TestThreadFactory threadFactory = new RateLimiterPerfTestMethods.TestThreadFactory(errorhandler);
ExecutorService tp = Executors.newFixedThreadPool(threadCount + 1, threadFactory);
final RateLimiter rl = rlFunc.apply(spec);
final double rate = spec.getRate();
final int iterationsPerThread = (int) (iterations / threadCount);
if (Integer.MAX_VALUE <= iterationsPerThread)
throw new RuntimeException("iterations per thread too high with (count,threads)=(" + iterations + ',' + threadCount);
final TestExceptionHandler errorhandler = new TestExceptionHandler();
final TestThreadFactory threadFactory = new TestThreadFactory(errorhandler);
final ExecutorService tp = Executors.newFixedThreadPool(threadCount + 1, threadFactory);
System.out.format("Running %,d iterations split over %,d threads (%,d per) at %,.3f ops/s\n", iterations, threadCount, (iterations / threadCount), rate);
RateLimiterPerfTestMethods.Acquirer[] threads = new RateLimiterPerfTestMethods.Acquirer[threadCount];
DeltaHdrHistogramReservoir stats = new DeltaHdrHistogramReservoir("times", 5);
System.out.format("Running %,d iterations split over %,d threads (%,d per) at %,.3f ops/s\n", iterations, threadCount, iterations / threadCount, rate);
final Acquirer[] threads = new Acquirer[threadCount];
final DeltaHdrHistogramReservoir stats = new DeltaHdrHistogramReservoir(NBLabels.forKV("name", "times"), 5);
CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
final CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
RateLimiterStarter starter = new RateLimiterStarter(barrier, rl);
final RateLimiterStarter starter = new RateLimiterStarter(barrier, rl);
for (int i = 0; i < threadCount; i++) {
threads[i] = new RateLimiterPerfTestMethods.Acquirer(i, rl, iterationsPerThread, stats, barrier);
// threads[i] = new RateLimiterPerfTestMethods.Acquirer(i, rl, (int) (iterations / threadCount), stats, barrier);
}
// threads[i] = new RateLimiterPerfTestMethods.Acquirer(i, rl, (int) (iterations / threadCount), stats, barrier);
for (int i = 0; i < threadCount; i++) threads[i] = new Acquirer(i, rl, iterationsPerThread, stats, barrier);
tp.execute(starter);
System.out.println(rl);
System.out.format("submitting (%d threads)...\n", threads.length);
List<Future<Result>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
futures.add(tp.submit((Callable<Result>) threads[i]));
}
final List<Future<Result>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) futures.add(tp.submit((Callable<Result>) threads[i]));
System.out.format("submitted (%d threads)...\n", threads.length);
try {
tp.shutdown();
if (!tp.awaitTermination(1000, TimeUnit.SECONDS)) {
if (!tp.awaitTermination(1000, TimeUnit.SECONDS))
throw new RuntimeException("Failed to shutdown thread pool.");
}
} catch (InterruptedException e) {
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
@@ -224,11 +220,11 @@ public class RateLimiterPerfTestMethods {
System.out.println(rl);
Perf aggregatePerf = new Perf("contended with " + threadCount + " threads for " + iterations + " iterations for " + rl.getRateSpec().toString());
final Perf aggregatePerf = new Perf("contended with " + threadCount + " threads for " + iterations + " iterations for " + rl.getRateSpec().toString());
futures.stream().map(f -> {
try {
return f.get();
} catch (Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
}).forEachOrdered(aggregatePerf::add);
@@ -239,7 +235,7 @@ public class RateLimiterPerfTestMethods {
// String refillLog = ((HybridRateLimiter) rl).getRefillLog();
// System.out.println("refill log:\n" + refillLog);
// }
Perf perf = aggregatePerf.reduceConcurrent();
final Perf perf = aggregatePerf.reduceConcurrent();
return perf;
}
@@ -248,7 +244,7 @@ public class RateLimiterPerfTestMethods {
private final CyclicBarrier barrier;
private final RateLimiter rl;
public RateLimiterStarter(CyclicBarrier barrier, RateLimiter rl) {
public RateLimiterStarter(final CyclicBarrier barrier, final RateLimiter rl) {
this.barrier = barrier;
this.rl = rl;
}
@@ -257,31 +253,29 @@ public class RateLimiterPerfTestMethods {
public void run() {
try {
// System.out.println("awaiting barrier (starter) (" + barrier.getNumberWaiting() + " awaiting)");
barrier.await(60, TimeUnit.SECONDS);
this.barrier.await(60, TimeUnit.SECONDS);
// System.out.println("started the rate limiter (starter) (" + barrier.getNumberWaiting() + " awaiting)");
} catch (Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
rl.start();
this.rl.start();
}
}
private static class TestExceptionHandler implements Thread.UncaughtExceptionHandler {
private static class TestExceptionHandler implements UncaughtExceptionHandler {
public List<Throwable> throwables = new ArrayList<>();
public List<Thread> threads = new ArrayList<>();
@Override
public void uncaughtException(Thread t, Throwable e) {
threads.add(t);
throwables.add(e);
public void uncaughtException(final Thread t, final Throwable e) {
this.threads.add(t);
this.throwables.add(e);
System.out.println("uncaught exception on thread " + t.getName() + ": " + e.toString());
}
public void throwIfAny() {
if (throwables.size() > 0) {
throw new RuntimeException(throwables.get(0));
}
if (0 < throwables.size()) throw new RuntimeException(this.throwables.get(0));
}
}
@@ -292,8 +286,8 @@ public class RateLimiterPerfTestMethods {
private final CyclicBarrier barrier;
private final long iterations;
public Acquirer(int i, RateLimiter limiter, int iterations, DeltaHdrHistogramReservoir reservoir, CyclicBarrier barrier) {
this.threadIdx = i;
public Acquirer(final int i, final RateLimiter limiter, final int iterations, final DeltaHdrHistogramReservoir reservoir, final CyclicBarrier barrier) {
threadIdx = i;
this.limiter = limiter;
this.iterations = iterations;
this.reservoir = reservoir;
@@ -304,47 +298,41 @@ public class RateLimiterPerfTestMethods {
public Result call() {
// synchronized (barrier) {
try {
if (this.threadIdx == 0) {
System.out.println("awaiting barrier");
}
barrier.await(60, TimeUnit.SECONDS);
if (this.threadIdx == 0) {
System.out.println("starting all threads");
}
if (0 == this.threadIdx) System.out.println("awaiting barrier");
this.barrier.await(60, TimeUnit.SECONDS);
if (0 == this.threadIdx) System.out.println("starting all threads");
} catch (Exception be) {
} catch (final Exception be) {
throw new RuntimeException(be); // This should not happen unless the test is broken
}
// }
long startTime = System.nanoTime();
for (int i = 0; i < iterations; i++) {
long time = limiter.maybeWaitForOp();
final long startTime = System.nanoTime();
for (int i = 0; i < this.iterations; i++) {
final long time = this.limiter.maybeWaitForOp();
}
long endTime = System.nanoTime();
return new Result("thread " + this.threadIdx, startTime, endTime, iterations);
final long endTime = System.nanoTime();
return new Result("thread " + threadIdx, startTime, endTime, this.iterations);
}
@Override
public void run() {
for (int i = 0; i < iterations; i++) {
limiter.maybeWaitForOp();
}
for (int i = 0; i < this.iterations; i++) this.limiter.maybeWaitForOp();
}
}
private static class TestThreadFactory implements ThreadFactory {
private final Thread.UncaughtExceptionHandler handler;
private final UncaughtExceptionHandler handler;
public TestThreadFactory(Thread.UncaughtExceptionHandler uceh) {
this.handler = uceh;
public TestThreadFactory(final UncaughtExceptionHandler uceh) {
handler = uceh;
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler(handler);
public Thread newThread(final Runnable r) {
final Thread t = new Thread(r);
t.setUncaughtExceptionHandler(this.handler);
return t;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,9 +16,10 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.testutils.Perf;
import io.nosqlbench.api.testutils.Result;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec.Verb;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -27,83 +28,83 @@ import java.util.function.Function;
public class TestHybridRateLimiterPerf {
private final Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(ActivityDef.parseActivityDef("alias=tokenrl"),"hybrid", rs.withVerb(RateSpec.Verb.start));
private final Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(NBLabeledElement.EMPTY,"hybrid", rs.withVerb(Verb.start));
private final RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
@Test
@Disabled
public void testPerf1e9() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E9, 1.1),10_000_000,0.01d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E9, 1.1),10_000_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e8() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E8, 1.1),50_000_000,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E8, 1.1),50_000_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e7() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E7, 1.1),5_000_000,0.01d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E7, 1.1),5_000_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e6() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E6, 1.1),500_000,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E6, 1.1),500_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e5() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E5, 1.1),50_000,0.01d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E5, 1.1),50_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e4() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E4, 1.1),5_000,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E4, 1.1),5_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e3() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E3, 1.1),500,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E3, 1.1),500,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e2() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E2, 1.1),50,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E2, 1.1),50,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e1() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E1, 1.1),5,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E1, 1.1),5,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e0() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E0, 1.1),2,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E0, 1.1),2,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testePerf1eN1() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E-1, 1.1),1,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E-1, 1.1),1,0.005d);
System.out.println(result);
}
@@ -111,14 +112,14 @@ public class TestHybridRateLimiterPerf {
@Test
@Disabled
public void test100Mops_160threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 10_000_000,160);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,160);
System.out.println(perf.getLastResult());
}
@Test
@Disabled
public void test100Mops_80threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 10_000_000,80);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,80);
System.out.println(perf.getLastResult());
}
@@ -131,7 +132,7 @@ public class TestHybridRateLimiterPerf {
@Test
@Disabled
public void test100Mops_40threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 10_000_000,40);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,40);
System.out.println(perf.getLastResult());
}
@@ -151,7 +152,7 @@ public class TestHybridRateLimiterPerf {
@Test
@Disabled
public void test100Mops_20threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 10_000_000,20);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,20);
System.out.println(perf.getLastResult());
}
@@ -164,7 +165,7 @@ public class TestHybridRateLimiterPerf {
@Test
@Disabled
public void test100Mops_10threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 10_000_000,10);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,10);
System.out.println(perf.getLastResult());
}
@@ -178,7 +179,7 @@ public class TestHybridRateLimiterPerf {
@Test
@Disabled
public void test100Mops_5threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 40_000_000,5);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 40_000_000,5);
System.out.println(perf.getLastResult());
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,8 +16,9 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.testutils.Perf;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec.Verb;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -32,7 +33,7 @@ import java.util.function.Function;
*/
public class TestRateLimiterPerf1E7 {
private final Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(ActivityDef.parseActivityDef("alias=tokenrl"),"hybrid", rs.withVerb(RateSpec.Verb.configure));
private final Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(NBLabeledElement.forKV("alias","tokenrl"),"hybrid", rs.withVerb(Verb.configure));
private final RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
// 160 threads at 10_000_000 ops/s
@@ -41,7 +42,7 @@ public class TestRateLimiterPerf1E7 {
@Test
@Disabled
public void test10Mops_160threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E7, 1.1), 20_000_000,160);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,160);
System.out.println(perf.getLastResult());
}
@@ -51,7 +52,7 @@ public class TestRateLimiterPerf1E7 {
@Test
@Disabled
public void test10Mops_80threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E7, 1.1), 20_000_000,80);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,80);
System.out.println(perf.getLastResult());
}
@@ -61,7 +62,7 @@ public class TestRateLimiterPerf1E7 {
@Test
@Disabled
public void test10Mops_40threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E7, 1.1), 20_000_000,40);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,40);
System.out.println(perf.getLastResult());
}
@@ -71,7 +72,7 @@ public class TestRateLimiterPerf1E7 {
@Test
@Disabled
public void test10Mops_20threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E7, 10), 20_000_000,20);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 10), 20_000_000,20);
System.out.println(perf.getLastResult());
}
@@ -85,7 +86,7 @@ public class TestRateLimiterPerf1E7 {
@Test
@Disabled
public void test10Mops_10threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E7, 1.1), 20_000_000,10);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,10);
System.out.println(perf.getLastResult());
}
@@ -100,7 +101,7 @@ public class TestRateLimiterPerf1E7 {
@Test
@Disabled
public void test10Mops_5threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E7, 1.1), 20_000_000,5);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,5);
System.out.println(perf.getLastResult());
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,8 +16,9 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.testutils.Perf;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec.Verb;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -32,20 +33,22 @@ import java.util.function.Function;
*/
public class TestRateLimiterPerf1E8 {
NBLabeledElement def = NBLabeledElement.forKV("alias","tokenrl");
private final Function<RateSpec, RateLimiter> rlFunction =
rs -> new HybridRateLimiter(
ActivityDef.parseActivityDef("alias=tokenrl"),
this.def,
"hybrid",
rs.withVerb(RateSpec.Verb.configure)
rs.withVerb(Verb.configure)
);
private final RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
@Test
@Disabled
public void test100Mops_4000threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(
rlFunction,
new RateSpec(1E8, 1.1),
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
4000
);
@@ -55,9 +58,9 @@ public class TestRateLimiterPerf1E8 {
@Test
@Disabled
public void test100Mops_2000threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(
rlFunction,
new RateSpec(1E8, 1.1),
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
2000
);
@@ -67,9 +70,9 @@ public class TestRateLimiterPerf1E8 {
@Test
@Disabled
public void test100Mops_1000threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(
rlFunction,
new RateSpec(1E8, 1.1),
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
1000
);
@@ -79,9 +82,9 @@ public class TestRateLimiterPerf1E8 {
@Test
@Disabled
public void test100Mops_320threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(
rlFunction,
new RateSpec(1E8, 1.1),
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
320
);
@@ -98,9 +101,9 @@ public class TestRateLimiterPerf1E8 {
@Test
@Disabled
public void test100Mops_160threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(
rlFunction,
new RateSpec(1E8, 1.1),
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
160
);
@@ -114,7 +117,7 @@ public class TestRateLimiterPerf1E8 {
@Test
@Disabled
public void test100Mops_80threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 80);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 80);
System.out.println(perf.getLastResult());
}
@@ -127,7 +130,7 @@ public class TestRateLimiterPerf1E8 {
@Test
@Disabled
public void test100Mops_40threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 40);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 40);
System.out.println(perf.getLastResult());
}
@@ -147,7 +150,7 @@ public class TestRateLimiterPerf1E8 {
@Test
@Disabled
public void test100Mops_20threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 20);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 20);
System.out.println(perf.getLastResult());
}
@@ -163,7 +166,7 @@ public class TestRateLimiterPerf1E8 {
@Test
@Disabled
public void test100Mops_10threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 10);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 10);
System.out.println(perf.getLastResult());
}
@@ -180,7 +183,7 @@ public class TestRateLimiterPerf1E8 {
@Test
@Disabled
public void test100Mops_5threads() {
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 5);
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 5);
System.out.println(perf.getLastResult());
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,8 +16,9 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.testutils.Result;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec.Verb;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -32,83 +33,83 @@ import java.util.function.Function;
*/
public class TestRateLimiterPerfSingle {
private final Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(ActivityDef.parseActivityDef("alias=tokenrl"),"hybrid", rs.withVerb(RateSpec.Verb.start));
private final Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(NBLabeledElement.forKV("alias","tokenrl"),"hybrid", rs.withVerb(Verb.start));
private final RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
@Test
@Disabled
public void testPerf1e9() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E9, 1.1),10_000_000,0.01d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E9, 1.1),10_000_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e8() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E8, 1.1),50_000_000,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E8, 1.1),50_000_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e7() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E7, 1.1),5_000_000,0.01d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E7, 1.1),5_000_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e6() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E6, 1.1),500_000,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E6, 1.1),500_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e5() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E5, 1.1),50_000,0.01d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E5, 1.1),50_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e4() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E4, 1.1),5_000,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E4, 1.1),5_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e3() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E3, 1.1),500,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E3, 1.1),500,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e2() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E2, 1.1),50,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E2, 1.1),50,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e1() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E1, 1.1),5,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E1, 1.1),5,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e0() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E0, 1.1),2,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E0, 1.1),2,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testePerf1eN1() {
Result result = methods.rateLimiterSingleThreadedConvergence(rlFunction,new RateSpec(1E-1, 1.1),1,0.005d);
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E-1, 1.1),1,0.005d);
System.out.println(result);
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,7 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.config.NBLabeledElement;
import java.util.concurrent.atomic.AtomicLong;
@@ -24,27 +24,27 @@ public class TestableHybridRateLimiter extends HybridRateLimiter {
private final AtomicLong clock;
public TestableHybridRateLimiter(AtomicLong clock, RateSpec rateSpec, NBNamedElement def) {
public TestableHybridRateLimiter(final AtomicLong clock, final RateSpec rateSpec, final NBLabeledElement def) {
super(def, "test", rateSpec);
applyRateSpec(rateSpec);
setLabel("test");
this.applyRateSpec(rateSpec);
this.setLabel("test");
this.clock = clock;
init(def);
this.init(def);
}
public long setClock(long newValue) {
long oldValue = clock.get();
clock.set(newValue);
public long setClock(final long newValue) {
final long oldValue = this.clock.get();
this.clock.set(newValue);
return oldValue;
}
public long getClock() {
return clock.get();
return this.clock.get();
}
@Override
protected long getNanoClockTime() {
return clock.get();
return this.clock.get();
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,7 +16,8 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.NBNamedElement;
import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import org.junit.jupiter.api.Test;
@@ -25,11 +26,13 @@ import static org.assertj.core.api.Assertions.assertThat;
public class TokenPoolTest {
ActivityDef def = new ActivityDef(ParameterMap.parseOrException("alias=testing"));
ActivityDef adef = new ActivityDef(ParameterMap.parseOrException("alias=testing"));
NBLabeledElement def = NBLabeledElement.forMap(this.adef.getParams().getStringStringMap());
@Test
public void testBackfillFullRate() {
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(new RateSpec(10000000, 1.1), def);
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(new RateSpec(10000000, 1.1), this.def);
assertThat(p.refill(1000000L)).isEqualTo(1000000L);
assertThat(p.getWaitPool()).isEqualTo(0L);
assertThat(p.refill(100L)).isEqualTo(1000100);
@@ -60,10 +63,10 @@ public class TokenPoolTest {
assertThat(p.getWaitTime()).isEqualTo(10000000L);
RateSpec s2 = new RateSpec(1000000L, 1.10D);
p.apply(new NBNamedElement() {
p.apply(new NBLabeledElement() {
@Override
public String getName() {
return "test";
public NBLabels getLabels() {
return NBLabels.forKV("name","test");
}
},s2);

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,9 +16,10 @@
package io.nosqlbench.engine.api.metrics;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.metrics.DeltaHdrHistogramReservoir;
import io.nosqlbench.api.engine.metrics.HistoIntervalLogger;
import io.nosqlbench.api.engine.metrics.NicerHistogram;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricHistogram;
import org.HdrHistogram.EncodableHistogram;
import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogReader;
@@ -44,16 +45,16 @@ public class HistoIntervalLoggerTest {
final int significantDigits = 4;
NicerHistogram nicerHistogram = new NicerHistogram(
"histo1", new DeltaHdrHistogramReservoir("histo1", significantDigits));
NBMetricHistogram NBHistogram = new NBMetricHistogram(
NBLabels.forKV("name", "histo1"), new DeltaHdrHistogramReservoir(NBLabels.forKV("name", "histo1"), significantDigits));
hil.onHistogramAdded("histo1",nicerHistogram);
hil.onHistogramAdded("histo1", NBHistogram);
nicerHistogram.update(1L);
NBHistogram.update(1L);
delay(1001);
nicerHistogram.update(1000000L);
NBHistogram.update(1000000L);
delay(1001);
nicerHistogram.update(1000L);
NBHistogram.update(1000L);
hil.onHistogramRemoved("histo1");
hil.closeMetrics();
@@ -63,7 +64,7 @@ public class HistoIntervalLoggerTest {
EncodableHistogram histogram;
while (true) {
histogram = hlr.nextIntervalHistogram();
if (histogram==null) {
if (null == histogram) {
break;
}
histos.add(histogram);
@@ -71,15 +72,15 @@ public class HistoIntervalLoggerTest {
assertThat(histos.size()).isEqualTo(2);
assertThat(histos.get(0)).isInstanceOf(Histogram.class);
assertThat(((Histogram)histos.get(0)).getNumberOfSignificantValueDigits()).isEqualTo(significantDigits);
assertThat(((Histogram) histos.get(0)).getNumberOfSignificantValueDigits()).isEqualTo(significantDigits);
}
private void delay(int i) {
long now = System.currentTimeMillis();
long target = now+i;
while (System.currentTimeMillis()<target) {
long target = now + i;
while (System.currentTimeMillis() < target) {
try {
Thread.sleep(target-System.currentTimeMillis());
Thread.sleep(target - System.currentTimeMillis());
} catch (InterruptedException ignored) {
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,26 +16,28 @@
package io.nosqlbench.engine.api.metrics;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.metrics.ConvenientSnapshot;
import io.nosqlbench.api.engine.metrics.DeltaHdrHistogramReservoir;
import io.nosqlbench.api.engine.metrics.NicerHistogram;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricHistogram;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class NicerHistogramTest {
public class NBMetricHistogramTest {
@Test
public void testNicerHistogramValues() {
NicerHistogram nh = new NicerHistogram("testhisto",new DeltaHdrHistogramReservoir("testhisto",4));
for (int i = 1; i <= 100; i++) {
NBMetricHistogram nh = new NBMetricHistogram(NBLabels.forKV("name","testhisto"), new DeltaHdrHistogramReservoir(
NBLabels.forKV("name", "testhisto"), 4));
for (int i = 1; 100 >= i; i++) {
nh.update(i);
}
ConvenientSnapshot snapshot = nh.getSnapshot();
assertThat(snapshot.getMax()).isEqualTo(100);
nh.getDeltaSnapshot(500); // Just to reset
for (int i=1; i<= 200; i++ ) {
for (int i = 1; 200 >= i; i++) {
nh.update(i);
}
ConvenientSnapshot deltaSnapshot1 = nh.getDeltaSnapshot(500);
@@ -43,7 +45,7 @@ public class NicerHistogramTest {
ConvenientSnapshot cachedSnapshot = nh.getSnapshot();
assertThat(cachedSnapshot.getMax()).isEqualTo(200);
for (int i=1; i<= 300; i++ ) {
for (int i = 1; 300 >= i; i++) {
nh.update(i);
}
ConvenientSnapshot stillCachedSnapshot = nh.getSnapshot();

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 nosqlbench
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ package io.nosqlbench.engine.api.metrics;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Snapshot;
import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.engine.metrics.DeltaHdrHistogramReservoir;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -27,33 +28,29 @@ public class TestHistoTypes {
@Test
@Disabled
public void compareHistos() {
Clock c = new Clock();
final Clock c = new Clock();
// Use the defaults that you get with "Timer()"
ExponentiallyDecayingReservoir expRes = new ExponentiallyDecayingReservoir(1028,0.015,c);
DeltaHdrHistogramReservoir hdrRes = new DeltaHdrHistogramReservoir("dr",4);
long max=100000000;
final ExponentiallyDecayingReservoir expRes = new ExponentiallyDecayingReservoir(1028,0.015,c);
final DeltaHdrHistogramReservoir hdrRes = new DeltaHdrHistogramReservoir(NBLabels.forKV("name", "dr"),4);
final long max=100000000;
for (long i = 0; i < max; i++) {
expRes.update(i);
hdrRes.update(i);
if ((i%1000000)==0) {
System.out.println(i);
}
if (0 == (i % 1000000)) System.out.println(i);
}
summary(0L,max, expRes.getSnapshot(), hdrRes.getSnapshot());
this.summary(0L,max, expRes.getSnapshot(), hdrRes.getSnapshot());
}
private void summary(long min, long max,Snapshot... snapshots) {
for (int i = 0; i <=100; i++) {
double pct = (double)i/100.0D;
double expectedValue=pct*max;
private void summary(final long min, final long max, final Snapshot... snapshots) {
for (int i = 0; 100 >= i; i++) {
final double pct = i /100.0D;
final double expectedValue=pct*max;
System.out.format("% 3d %%p is % 11d : ",(long)(pct*100),(long)expectedValue);
for (Snapshot snapshot : snapshots) {
System.out.format("% 10d ",(long)snapshot.getValue(pct));
}
System.out.print("\n");
for (final Snapshot snapshot : snapshots) System.out.format("% 10d ", (long) snapshot.getValue(pct));
System.out.print('\n');
}
}
@@ -63,12 +60,12 @@ public class TestHistoTypes {
@Override
public long getTime() {
return nanos/1000000;
return this.nanos /1000000;
}
@Override
public long getTick() {
return nanos;
return this.nanos;
}
}
}