refactor and replace rate limiter implementation

This commit is contained in:
Jonathan Shook 2023-10-09 17:02:56 -05:00
parent 9e91a6201d
commit 30cdd280cc
30 changed files with 722 additions and 2387 deletions

View File

@ -56,7 +56,7 @@ public class DiagSpace implements ActivityDefObserver, AutoCloseable {
public void maybeWaitForOp(double diagrate) {
if (diagRateLimiter != null) {
long waittime = diagRateLimiter.maybeWaitForOp();
long waittime = diagRateLimiter.block();
}
}

View File

@ -19,7 +19,7 @@ package io.nosqlbench.adapter.diag.optasks;
import io.nosqlbench.api.config.standard.*;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
import io.nosqlbench.nb.annotations.Service;
import java.util.Map;
@ -28,15 +28,14 @@ import java.util.Map;
public class DiagTask_diagrate extends BaseDiagTask implements NBReconfigurable {
private String name;
private RateLimiter rateLimiter;
private RateSpec rateSpec;
private SimRateSpec simRateSpec;
private void updateRateLimiter(String spec) {
this.rateSpec = new RateSpec(spec);
this.simRateSpec = new SimRateSpec(spec);
rateLimiter = RateLimiters.createOrUpdate(
this.parent,
"diag",
rateLimiter,
rateSpec
simRateSpec
);
}
@ -68,7 +67,7 @@ public class DiagTask_diagrate extends BaseDiagTask implements NBReconfigurable
@Override
public Map<String, Object> apply(Long aLong, Map<String, Object> stringObjectMap) {
rateLimiter.maybeWaitForOp();
rateLimiter.block();
return stringObjectMap;
}
}

View File

@ -1,21 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
public interface DiagUpdateRate {
void setDiagModulo(long diagModulo);
}

View File

@ -1,243 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import com.codahale.metrics.Gauge;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.atomic.AtomicLong;
/**
* <H2>Synopsis</H2>
* <p>This rate limiter uses nanoseconds as the unit of timing. This
* works well because it is the native precision of the system timer
* interface via {@link System#nanoTime()}. It is also low-error
* in terms of rounding between floating point rates and nanoseconds,
* at least in the round numbers that users tend to use. Further,
* the current scheduling state is maintained as an atomic view of
* accumulated nanoseconds granted to callers -- referred to here as the
* ticks accumulator. This further simplifies the implementation by
* allowing direct comparison of scheduled times with the current
* state of the high-resolution system timer.
*
* <H2>Design Notes</H2>
* This implementation makes certain trade-offs needed to support a combination of requirements.
* Specifically, some small degree of inaccuracy is allowed to enable higher throughput when
* needed. Some practical limitations affect how accurate we can be:
*
* <OL>
* <LI>This is not a real-time system with guarantees on scheduling.</LI>
* <LI>Calling overhead is significant for reading the RTC or sleeping.</LI>
* <LI>Controlling the accuracy of a delay is not possible under any level of load.</LI>
* <LI>It is undesirable (wasteful) to use spin loops to delay.</LI>
* </OL>
*
* Together, these factors mean a compromise is inevitable. In practice it means that a very accurate
* implementation will likely be very slow, and a very fast implementation will likely be very inaccurate.
* This implementation tries to strike a balance, providing accuracy near the microsecond level,
* while allowing rates in the tens of millions per second, even under heavy thread contention.
*
* <H2>Burst Ratio</H2>
* <p>
* This rate limiter provides a sliding scale between strict rate limiting and average rate limiting,
* the difference between the two controlled by a <em>burst ratio</em> parameter. When the burst
* ratio is 1.0, the rate limiter acts as a strict rate limiter, disallowing faster operations
* from using time that was previously forfeited by prior slower operations. This is a "use it
* or lose it" mode that means things like GC events can steal throughput from a running client
* as a necessary effect of losing time in a strict timing sense.
* </p>
*
* <p>
* When the burst ratio is set to higher than 1.0, faster operations may recover lost time from
* previously slower operations. This means that any valleys created in the actual op rate of the
* client can be converted into plateaus of throughput above the strict rate, but only at a speed that
* fits within (op rate * burst ratio). This allows for workloads to approximate the average
* target rate over time, with controllable bursting rates. This ability allows for near-strict
* behavior while allowing clients to still track truer to rate limit expectations, so long as the
* overall workload is not saturating resources.
* </p>
*/
@Service(value = RateLimiter.class, selector = "hybrid")
public class HybridRateLimiter extends NBBaseComponent implements RateLimiter {
private static final Logger logger = LogManager.getLogger(HybridRateLimiter.class);
private NBLabeledElement named;
//private volatile TokenFiller filler;
private volatile long starttime;
// rate controls
private RateSpec rateSpec;
// basic state
private String label;
private State state = State.Idle;
// metrics
private Gauge<Double> delayGauge;
private Gauge<Double> avgRateGauge;
private Gauge<Double> burstRateGauge;
private TokenPool tokens;
// diagnostics
// TODO Doc rate limiter scenarios, including when you want to reset the waittime, and when you don't
private final AtomicLong cumulativeWaitTimeNanos = new AtomicLong(0L);
protected HybridRateLimiter(NBComponent parent) {
super(parent);
}
public HybridRateLimiter(final NBComponent parent, final String label, final RateSpec rateSpec) {
super(parent);
this.label = label;
this.init(named);
applyRateSpec(rateSpec);
}
protected void setLabel(final String label) {
this.label = label;
}
@Override
public long maybeWaitForOp() {
return this.tokens.blockAndTake();
}
@Override
public long getTotalWaitTime() {
return cumulativeWaitTimeNanos.get() + this.getWaitTime();
}
@Override
public long getWaitTime() {
return this.tokens.getWaitTime();
}
@Override
public RateSpec getRateSpec() {
return rateSpec;
}
@Override
public synchronized void applyRateSpec(final RateSpec updatingRateSpec) {
if (null == updatingRateSpec) throw new RuntimeException("RateSpec must be defined");
if (updatingRateSpec.equals(rateSpec) && !updatingRateSpec.isRestart()) return;
rateSpec = updatingRateSpec;
tokens = null == this.tokens ? new ThreadDrivenTokenPool(this,this.rateSpec, this.named) : tokens.apply(this.named, this.rateSpec);
// this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
// this.tokens = this.filler.getTokenPool();
if ((State.Idle == this.state) && updatingRateSpec.isAutoStart()) start();
else if (updatingRateSpec.isRestart()) restart();
}
protected void init(final NBLabeledElement activityDef) {
delayGauge = create().gauge( this.label + "_waittime", () -> (double)getTotalWaitTime());
avgRateGauge = create().gauge(this.label + "_config_cyclerate", () -> getRateSpec().opsPerSec);
burstRateGauge = create().gauge(this.label + "_config_burstrate", () -> getRateSpec().getBurstRatio() * getRateSpec().getRate());
}
@Override
public synchronized void start() {
switch (this.state) {
case Started:
// logger.warn("Tried to start a rate limiter that was already started. If this is desired, use restart() instead");
// TODO: Find a better way to warn about spurious rate limiter
// starts, since the check condition was not properly isolated
break;
case Idle:
final long nanos = this.getNanoClockTime();
starttime = nanos;
tokens.start();
this.state = State.Started;
break;
}
}
public synchronized long restart() {
switch (this.state) {
case Idle:
start();
return 0L;
case Started:
final long accumulatedWaitSinceLastStart = this.cumulativeWaitTimeNanos.get();
this.cumulativeWaitTimeNanos.set(0L);
return tokens.restart() + accumulatedWaitSinceLastStart;
default:
return 0L;
}
}
@Override
public long getStartTime() {
return 0;
}
private synchronized void checkpointCumulativeWaitTime() {
final long nanos = this.getNanoClockTime();
starttime = nanos;
this.cumulativeWaitTimeNanos.addAndGet(this.getWaitTime());
}
protected long getNanoClockTime() {
return System.nanoTime();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(HybridRateLimiter.class.getSimpleName());
sb.append("{\n");
if (null != this.getRateSpec()) sb.append(" spec:").append(rateSpec.toString());
if (null != this.tokens) sb.append(",\n tokenpool:").append(tokens);
if (null != this.state) sb.append(",\n state:'").append(state).append('\'');
sb.append("\n}");
return sb.toString();
}
// public String getRefillLog() {
// return this.filler.getRefillLog();
// }
private enum State {
Idle,
Started
}
private class PoolGauge implements Gauge<Long> {
private final HybridRateLimiter rl;
public PoolGauge(final HybridRateLimiter hybridRateLimiter) {
rl = hybridRateLimiter;
}
@Override
public Long getValue() {
final TokenPool pool = this.rl.tokens;
if (null == pool) return 0L;
return pool.getWaitTime();
}
}
}

View File

@ -1,365 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import com.codahale.metrics.Timer;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.util.Colors;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
/**
* <h2>Synopsis</h2>
*
* This TokenPool represents a finite quantity which can be
* replenished with regular refills. Extra tokens that do not fit
* within the active token pool are saved in a waiting token pool and
* used to backfill when allowed according to the backfill rate.
*
* A detailed explanation for how this works will be included
* at @link "http://docs.nosqlbench.io/" under dev notes.
*
* <p>This is the basis for the token-based rate limiters in
* NB. This mechanism is easily adaptable to bursting
* capability as well as a degree of stricter timing at speed.
* Various methods for doing this in a lock free way were
* investigated, but the intrinsic locks provided by synchronized
* method won out for now. This may be revisited when EB is
* retrofitted for J11.
* </p>
*/
public class InlineTokenPool {
private static final Logger logger = LogManager.getLogger(InlineTokenPool.class);
public static final double MIN_CONCURRENT_OPS = 5;
private final NBComponent parent;
// Size limit of active pool
private long maxActivePoolSize;
// Size limit of burst pool incremental above active pool
private long maxBurstPoolSize;
// Size limit of total active tokens which can be waiting in active pool, considering burst
private long maxActiveAndBurstSize;
// Ratio of speed relative to base speed at which bursting is allowed
private double burstRatio;
// TODO Consider removing volatile after investigating
// The active number of tokens (ns) available for consumers
private volatile long activePool;
// The tokens which were not claimed on time, and were moved into the waitime (reserve) pool
private volatile long waitingPool;
// How many tokens (ns) represent passage of time for a single op, given the op rate
private long nanosPerOp;
// The nanotime of the last refill
private volatile long lastRefillAt;
// metrics for refill
private final Timer refillTimer;
// update rate for refiller
private final long interval = (long) 1.0E6;
private RateSpec rateSpec;
// private long debugTrigger=0L;
// private long debugRate=1000000000;
// Total number of thread blocks that occured since this token pool was started
private long blocks;
private final Lock lock = new ReentrantLock();
private final Condition lockheld = this.lock.newCondition();
/**
* This constructor tries to pick reasonable defaults for the token pool for
* a given rate spec. The active pool must be large enough to contain one
* op worth of time, and the burst ratio
*
* @param rateSpec a {@link RateSpec}
*/
public InlineTokenPool(final RateSpec rateSpec, final ActivityDef def, final NBComponent parent) {
this.parent = parent;
final ByteBuffer logbuf = this.getBuffer();
this.apply(rateSpec);
InlineTokenPool.logger.debug("initialized token pool: {} for rate:{}", this, rateSpec);
refillTimer = parent.create().timer("tokenfiller",4);
}
public InlineTokenPool(final long poolsize, final double burstRatio, final ActivityDef def, final NBComponent parent) {
this.parent = parent;
final ByteBuffer logbuf = this.getBuffer();
maxActivePoolSize = poolsize;
this.burstRatio = burstRatio;
maxActiveAndBurstSize = (long) (this.maxActivePoolSize * burstRatio);
maxBurstPoolSize = this.maxActiveAndBurstSize - this.maxActivePoolSize;
refillTimer = parent.create().timer( "tokenfiller",4);
}
/**
* Change the settings of this token pool, and wake any blocked callers
* just in case it allows them to proceed.
*
* @param rateSpec The rate specifier.
*/
public synchronized void apply(final RateSpec rateSpec) {
this.rateSpec = rateSpec;
// maxActivePool is set to the higher of 1M or however many nanos are needed for 2 ops to be buffered
maxActivePoolSize = Math.max((long) 1.0E6, (long) (rateSpec.getNanosPerOp() * InlineTokenPool.MIN_CONCURRENT_OPS));
maxActiveAndBurstSize = (long) (this.maxActivePoolSize * rateSpec.getBurstRatio());
burstRatio = rateSpec.getBurstRatio();
maxBurstPoolSize = this.maxActiveAndBurstSize - this.maxActivePoolSize;
nanosPerOp = rateSpec.getNanosPerOp();
this.notifyAll();
}
public double getBurstRatio() {
return this.burstRatio;
}
/**
* Take tokens up to amt tokens form the pool and report
* the amount of token removed.
*
* @param amt tokens requested
* @return actual number of tokens removed, greater to or equal to zero
*/
public synchronized long takeUpTo(final long amt) {
final long take = Math.min(amt, this.activePool);
this.activePool -= take;
return take;
}
/**
* wait for the given number of tokens to be available, and then remove
* them from the pool.
*
* @return the total number of tokens untaken, including wait tokens
*/
public long blockAndTake() {
synchronized (this) {
if (this.activePool >= this.nanosPerOp) {
this.activePool -= this.nanosPerOp;
return this.waitingPool + this.activePool;
}
}
while (true) if (this.lock.tryLock()) try {
while (this.activePool < this.nanosPerOp) this.dorefill();
this.lockheld.signal();
this.lockheld.signal();
} finally {
this.lock.unlock();
}
else try {
this.lockheld.await();
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
// while (activePool < nanosPerOp) {
// blocks++;
// //System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
// try {
// wait();
//// wait(maxActivePoolSize / 1000000, (int) maxActivePoolSize % 1000000);
// } catch (InterruptedException ignored) {
// } catch (Exception e) {
// throw new RuntimeException(e);
// }
// //System.out.println("waited for " + amt + "/" + activePool + " tokens");
// }
// //System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
//
// activePool -= nanosPerOp;
// return waitingPool + activePool;
}
public synchronized long blockAndTakeOps(final long ops) {
final long totalNanosNeeded = ops * this.nanosPerOp;
while (this.activePool < totalNanosNeeded) {
this.blocks++;
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
this.wait();
// wait(maxActivePoolSize / 1000000, (int) maxActivePoolSize % 1000000);
} catch (final InterruptedException ignored) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
this.activePool -= totalNanosNeeded;
return this.waitingPool + this.activePool;
}
public synchronized long blockAndTake(final long tokens) {
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
while (this.activePool < tokens) try {
this.wait();
// wait(maxActivePoolSize / 1000000, (int) maxActivePoolSize % 1000000);
} catch (final InterruptedException ignored) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
this.activePool -= tokens;
return this.waitingPool + this.activePool;
}
public long getWaitTime() {
return this.activePool + this.waitingPool;
}
public long getWaitPool() {
return this.waitingPool;
}
public long getActivePool() {
return this.activePool;
}
/**
* Add the given number of new tokens to the pool, forcing any amount
* that would spill over the current pool size into the wait token pool, but
* moving up to the configured burst tokens back from the wait token pool
* otherwise.
*
* The amount of backfilling that occurs is controlled by the backfill ratio,
* based on the number of tokens submitted. This causes normalizes the
* backfilling rate to the fill rate, so that it is not sensitive to refill
* scheduling.
*
* @param newTokens The number of new tokens to add to the token pools
* @return the total number of tokens in all pools
*/
public synchronized long refill(final long newTokens) {
final boolean debugthis = false;
// long debugAt = System.nanoTime();
// if (debugAt>debugTrigger+debugRate) {
// debugTrigger=debugAt;
// debugthis=true;
// }
final long needed = Math.max(this.maxActivePoolSize - this.activePool, 0L);
final long allocatedToActivePool = Math.min(newTokens, needed);
this.activePool += allocatedToActivePool;
// overflow logic
final long allocatedToOverflowPool = newTokens - allocatedToActivePool;
this.waitingPool += allocatedToOverflowPool;
// backfill logic
final double refillFactor = Math.min((double) newTokens / this.maxActivePoolSize, 1.0D);
long burstFillAllowed = (long) (refillFactor * this.maxBurstPoolSize);
burstFillAllowed = Math.min(this.maxActiveAndBurstSize - this.activePool, burstFillAllowed);
final long burstFill = Math.min(burstFillAllowed, this.waitingPool);
this.waitingPool -= burstFill;
this.activePool += burstFill;
if (debugthis) {
System.out.print(this);
System.out.print(Colors.ANSI_BrightBlue + " adding=" + allocatedToActivePool);
if (0 < allocatedToOverflowPool)
System.out.print(Colors.ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + Colors.ANSI_Reset);
if (0 < burstFill) System.out.print(Colors.ANSI_BrightGreen + " BACKFILL:" + burstFill + Colors.ANSI_Reset);
System.out.println();
}
//System.out.println(this);
this.notifyAll();
return this.activePool + this.waitingPool;
}
@Override
public String toString() {
return "Tokens: active=" + this.activePool + '/' + this.maxActivePoolSize
+ String.format(
" (%3.1f%%)A (%3.1f%%)B ",
(double) this.activePool / this.maxActivePoolSize * 100.0,
(double) this.activePool / this.maxActiveAndBurstSize * 100.0) + " waiting=" + this.waitingPool +
" blocks=" + this.blocks +
" rateSpec:" + (null != rateSpec ? this.rateSpec.toString() : "NULL");
}
public RateSpec getRateSpec() {
return this.rateSpec;
}
public synchronized long restart() {
final long wait = this.activePool + this.waitingPool;
this.activePool = 0L;
this.waitingPool = 0L;
return wait;
}
private ByteBuffer getBuffer() {
RandomAccessFile image = null;
try {
image = new RandomAccessFile("tokenbucket.binlog", "rw");
final ByteBuffer mbb = image.getChannel().map(MapMode.READ_WRITE, 0, image.length());
return mbb;
} catch (final Exception e) {
throw new RuntimeException(e);
}
}
public synchronized void dorefill() {
this.lastRefillAt = System.nanoTime();
final long nextRefillTime = this.lastRefillAt + this.interval;
long thisRefillTime = System.nanoTime();
while (thisRefillTime < nextRefillTime) {
// while (thisRefillTime < lastRefillAt + interval) {
final long parkfor = Math.max(nextRefillTime - thisRefillTime, 0L);
//System.out.println(ANSI_Blue + "parking for " + parkfor + "ns" + ANSI_Reset);
LockSupport.parkNanos(parkfor);
thisRefillTime = System.nanoTime();
}
// this.times[iteration]=thisRefillTime;
final long delta = thisRefillTime - this.lastRefillAt;
// this.amounts[iteration]=delta;
this.lastRefillAt = thisRefillTime;
//System.out.println(this);
this.refill(delta);
this.refillTimer.update(delta, TimeUnit.NANOSECONDS);
// iteration++;
}
}

View File

@ -1,113 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.engine.api.activityapi.sysperf.SysPerf;
import io.nosqlbench.engine.api.activityapi.sysperf.SysPerfData;
import java.util.concurrent.locks.LockSupport;
public class LeastWorstDelay {
public final static SysPerfData perfdata = SysPerf.get().getPerfData(false);
//private final static long sleepThreshold = (long) perfdata
//.getAvgNanos_Thread_Sleep();
//private final static long parkThreshold = (long) perfdata
//.getAvgNanos_LockSupport_ParkNanos();
private final static long sleepThreshold = 1_000_000;
private final static long parkThreshold = 20;
/**
* We wish for the JVM to inline this.
*
* This method tries to block a thread for a period of time, with a balance of
* accuracy and calling overhead. It does this by estimating the best way to
* block according to the time to block for and some knowledge of how much
* overhead and accuracy each method of blocking has. It's not perfect, but it
* is marginally better than a CPU burning busy wait or a throughput killing sleep
* right in the middle of every single thread.
*
* A better implementation would use sparse sampling of effective accuracy
* and feedback into the offsets, to deal with variability in CPU availability.
*
* @param nanos nanoseconds to delay for
*/
public static void delayAsIfFor(long nanos) {
if (nanos > 0) {
if (nanos > sleepThreshold) {
nanos -= sleepThreshold;
try {
Thread.sleep((nanos / 1000000), (int) (nanos % 1000000));
} catch (InterruptedException ignored) {
}
} else if (nanos > parkThreshold) {
nanos -= parkThreshold;
LockSupport.parkNanos(nanos);
}
}
}
public static void debugDelayAsIfFor(long nanos) {
if (nanos > 0) {
if (nanos > sleepThreshold) {
try {
System.out.printf("sleeping for %.9fS%n", ((double) nanos / 1E9));
Thread.sleep((nanos / 1000000), (int) (nanos % 1000000));
} catch (InterruptedException ignored) {
}
} else if (nanos > parkThreshold) {
System.out.printf("parking for %.9fS%n", ((double) nanos / 1E9));
LockSupport.parkNanos(nanos);
}
}
}
/**
* This method has a quirky name, because it does something a bit quirky.
*
* Inject delay, but do not await a condition that the delay is accurate
* according to the real time clock. Return the presumed real time clock
* value after the delay.
*
* This method is meant to provide lightweight delay when accuracy is not
* as important as efficiency, and where the jitter in the result will not
* result in an error that accumulates. Users must be careful to avoid using
* this method in other scenarios.
*
* @param targetNanoTime The system nanos that the delay should attempt to return at.
* perfect accuracy, which doesn't happen
*/
public void delayAsIfUntil(long targetNanoTime) {
long nanos = Math.max(targetNanoTime - System.nanoTime(), 0L);
if (nanos > 0) {
if (nanos > sleepThreshold) {
nanos -= sleepThreshold;
try {
Thread.sleep((nanos / 1000000), (int) (nanos % 1000000));
} catch (InterruptedException ignored) {
}
} else if (nanos > parkThreshold) {
nanos -= parkThreshold;
LockSupport.parkNanos(nanos);
} // else there is nothing shorter than this besides spinning, and we're not doing that
}
}
}

View File

@ -16,23 +16,25 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.engine.api.activityapi.core.Startable;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
public interface RateLimiter extends Startable {
import java.time.Duration;
public interface RateLimiter {
/**
* Block until it is time for the next operation, according to the
* nanoseconds per op as set by {@link #applyRateSpec(RateSpec)}
* <P>Block until it is time for the next operation, according to the
* nanoseconds per op as set by {@link #applyRateSpec(SimRateSpec)}
* @return the waittime as nanos behind schedule when this op returns.
* The returned value is required to be greater than or equal to zero.
* The returned value is required to be greater than or equal to zero.</P>
*
* Note that accuracy of the returned value is limited by timing
* <P>Note that accuracy of the returned value is limited by timing
* precision and calling overhead of the real time clock. It will not
* generally be better than microseconds. Also, some rate limiting
* algorithms are unable to efficiently track per-op waittime at speed
* due to bulk allocation mechanisms necessary to support higher rates.
* due to bulk allocation mechanisms necessary to support higher rates.</P>
*/
long maybeWaitForOp();
long block();
/**
* Return the total number of nanoseconds behind schedule
@ -41,7 +43,7 @@ public interface RateLimiter extends Startable {
* an accumulator and also included in any subsequent measurement.
* @return nanoseconds behind schedule since the rate limiter was started
*/
long getTotalWaitTime();
Duration getTotalWaitTimeDuration();
/**
* Return the total number of nanoseconds behind schedule
@ -50,13 +52,16 @@ public interface RateLimiter extends Startable {
* an accumulator and also included in any subsequent measurement.
* @return nanoseconds behind schedule since the rate limiter was started
*/
long getWaitTime();
Duration getWaitTimeDuration();
double getWaitTimeSeconds();
/**
* Modify the rate of a running rate limiter.
* @param spec The rate and burstRatio specification
*/
void applyRateSpec(RateSpec spec);
void applyRateSpec(SimRateSpec spec);
/**
@ -70,6 +75,6 @@ public interface RateLimiter extends Startable {
* Get the rate spec that this rate limiter was created from.
* @return a RateSpec that describes this rate limiter
*/
RateSpec getRateSpec();
SimRateSpec getSpec();
}

View File

@ -17,8 +17,9 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import com.codahale.metrics.Gauge;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRate;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -26,10 +27,10 @@ public enum RateLimiters {
;
private static final Logger logger = LogManager.getLogger(RateLimiters.class);
public static synchronized RateLimiter createOrUpdate(final NBComponent parent, final String label, final RateLimiter extant, final RateSpec spec) {
public static synchronized RateLimiter createOrUpdate(final NBComponent parent, final RateLimiter extant, final SimRateSpec spec) {
if (null == extant) {
final RateLimiter rateLimiter= new HybridRateLimiter(parent, label, spec);
final RateLimiter rateLimiter= new SimRate(parent, spec);
RateLimiters.logger.info(() -> "Using rate limiter: " + rateLimiter);
return rateLimiter;
@ -40,47 +41,8 @@ public enum RateLimiters {
}
public static synchronized RateLimiter create(final NBComponent def, final String label, final String specString) {
return RateLimiters.createOrUpdate(def, label, null, new RateSpec(specString));
return RateLimiters.createOrUpdate(def, null, new SimRateSpec(specString));
}
public static class WaitTimeGauge implements Gauge<Double> {
private final RateLimiter rateLimiter;
public WaitTimeGauge(final RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
@Override
public Double getValue() {
return (double)this.rateLimiter.getTotalWaitTime();
}
}
public static class RateGauge implements Gauge<Double> {
private final RateLimiter rateLimiter;
public RateGauge(final RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
@Override
public Double getValue() {
return this.rateLimiter.getRateSpec().opsPerSec;
}
}
public static class BurstRateGauge implements Gauge<Double> {
private final RateLimiter rateLimiter;
public BurstRateGauge(final RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}
@Override
public Double getValue() {
return this.rateLimiter.getRateSpec().getBurstRatio() * this.rateLimiter.getRateSpec().getRate();
}
}
}

View File

@ -1,263 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import static io.nosqlbench.engine.api.util.Colors.*;
/**
* <h2>Synopsis</h2>
*
* This TokenPool represents a finite quantity which can be
* replenished with regular refills. Extra tokens that do not fit
* within the active token pool are saved in a waiting token pool and
* used to backfill when allowed according to the backfill rate.
*
* A detailed explanation for how this works will be included
* at @link "http://docs.nosqlbench.io/" under dev notes.
*
* <p>This is the basis for the token-based rate limiters in
* NB. This mechanism is easily adaptable to bursting
* capability as well as a degree of stricter timing at speed.
* Various methods for doing this in a lock free way were
* investigated, but the intrinsic locks provided by synchronized
* method won out for now. This may be revisited when EB is
* retrofitted for J11.
* </p>
*/
@Service(value= TokenPool.class, selector="threaded")
public class ThreadDrivenTokenPool implements TokenPool {
private static final Logger logger = LogManager.getLogger(ThreadDrivenTokenPool.class);
public static final double MIN_CONCURRENT_OPS = 2;
private final NBComponent parent;
private long maxActivePool;
private long burstPoolSize;
private long maxOverActivePool;
private double burstRatio;
// TODO Consider removing volatile after investigating
private volatile long activePool;
private volatile long waitingPool;
private RateSpec rateSpec;
private long nanosPerOp;
private long blocks;
private TokenFiller filler;
/**
* This constructor tries to pick reasonable defaults for the token pool for
* a given rate spec. The active pool must be large enough to contain one
* op worth of time, and the burst ratio
*
* @param rateSpec a {@link RateSpec}
*/
public ThreadDrivenTokenPool(NBComponent parent, final RateSpec rateSpec, final NBLabeledElement named) {
this.parent = parent;
this.apply(named,rateSpec);
ThreadDrivenTokenPool.logger.debug(() -> "initialized token pool: " + this + " for rate:" + rateSpec);
// filler.start();
}
/**
* Change the settings of this token pool, and wake any blocked callers
* just in case it allows them to proceed.
*
* @param rateSpec The rate specifier.
*/
@Override
public synchronized TokenPool apply(final NBLabeledElement labeled, final RateSpec rateSpec) {
this.rateSpec = rateSpec;
maxActivePool = Math.max((long) 1.0E6, (long) (rateSpec.getNanosPerOp() * ThreadDrivenTokenPool.MIN_CONCURRENT_OPS));
maxOverActivePool = (long) (this.maxActivePool * rateSpec.getBurstRatio());
burstRatio = rateSpec.getBurstRatio();
burstPoolSize = this.maxOverActivePool - this.maxActivePool;
nanosPerOp = rateSpec.getNanosPerOp();
filler = null == this.filler ? new TokenFiller(parent, rateSpec, this, labeled, 3) : this.filler.apply(rateSpec);
this.notifyAll();
return this;
}
@Override
public double getBurstRatio() {
return this.burstRatio;
}
/**
* Take tokens up to amt tokens form the pool and report
* the amount of token removed.
*
* @param amt tokens requested
* @return actual number of tokens removed, greater to or equal to zero
*/
@Override
public synchronized long takeUpTo(final long amt) {
final long take = Math.min(amt, this.activePool);
this.activePool -= take;
return take;
}
/**
* wait for the given number of tokens to be available, and then remove
* them from the pool.
*
* @return the total number of tokens untaken, including wait tokens
*/
@Override
public synchronized long blockAndTake() {
while (this.activePool < this.nanosPerOp) {
this.blocks++;
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
try {
this.wait(1000);
// wait(maxActivePool / 1000000, 0);
} catch (final InterruptedException ignored) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
this.activePool -= this.nanosPerOp;
return this.waitingPool + this.activePool;
}
@Override
public synchronized long blockAndTake(final long tokens) {
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
while (this.activePool < tokens) try {
this.wait(this.maxActivePool / 1000000, (int) this.maxActivePool % 1000000);
} catch (final InterruptedException ignored) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
this.activePool -= tokens;
return this.waitingPool + this.activePool;
}
@Override
public long getWaitTime() {
return this.activePool + this.waitingPool;
}
@Override
public long getWaitPool() {
return this.waitingPool;
}
@Override
public long getActivePool() {
return this.activePool;
}
/**
* Add the given number of new tokens to the pool, forcing any amount
* that would spill over the current pool size into the wait token pool, but
* moving up to the configured burst tokens back from the wait token pool
* otherwise.
*
* The amount of backfilling that occurs is controlled by the backfill ratio,
* based on the number of tokens submitted. This causes normalizes the
* backfilling rate to the fill rate, so that it is not sensitive to refill
* scheduling.
*
* @param newTokens The number of new tokens to add to the token pools
* @return the total number of tokens in all pools
*/
public synchronized long refill(final long newTokens) {
final boolean debugthis = false;
// long debugAt = System.nanoTime();
// if (debugAt>debugTrigger+debugRate) {
// debugTrigger=debugAt;
// debugthis=true;
// }
final long needed = Math.max(this.maxActivePool - this.activePool, 0L);
final long allocatedToActivePool = Math.min(newTokens, needed);
this.activePool += allocatedToActivePool;
// overflow logic
final long allocatedToOverflowPool = newTokens - allocatedToActivePool;
this.waitingPool += allocatedToOverflowPool;
// backfill logic
final double refillFactor = Math.min((double) newTokens / this.maxActivePool, 1.0D);
long burstFillAllowed = (long) (refillFactor * this.burstPoolSize);
burstFillAllowed = Math.min(this.maxOverActivePool - this.activePool, burstFillAllowed);
final long burstFill = Math.min(burstFillAllowed, this.waitingPool);
this.waitingPool -= burstFill;
this.activePool += burstFill;
if (debugthis) {
System.out.print(this);
System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
if (0 < allocatedToOverflowPool)
System.out.print(ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + ANSI_Reset);
if (0 < burstFill) System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
System.out.println();
}
//System.out.println(this);
this.notifyAll();
return this.activePool + this.waitingPool;
}
@Override
public String toString() {
return String.format(
"{ active:%d, max:%d, fill:'(%,3.1f%%)A (%,3.1f%%)B', wait_ns:%,d, blocks:%,d }",
this.activePool, this.maxActivePool,
(double) this.activePool / this.maxActivePool * 100.0,
(double) this.activePool / this.maxOverActivePool * 100.0,
this.waitingPool,
this.blocks
);
}
@Override
public RateSpec getRateSpec() {
return this.rateSpec;
}
@Override
public synchronized long restart() {
final long wait = this.activePool + this.waitingPool;
this.activePool = 0L;
this.waitingPool = 0L;
return wait;
}
@Override
public synchronized void start() {
this.filler.start();
}
}

View File

@ -1,129 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import com.codahale.metrics.Timer;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.components.NBComponent;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class TokenFiller implements Runnable {
private static final Logger logger = LogManager.getLogger(TokenFiller.class);
public static final double MIN_PER_SECOND = 10.0D;
public static final double MAX_PER_SECOND = 1000.0D;
// private final SysPerfData PERFDATA = SysPerf.get().getPerfData
// (false);
private final long interval = (long) 1.0E5;
private final ThreadDrivenTokenPool tokenPool;
private volatile boolean running = true;
private RateSpec rateSpec;
private Thread thread;
private volatile long lastRefillAt;
private final Timer timer;
/**
* A token filler adds tokens to a {@link ThreadDrivenTokenPool} at some rate.
* By default, this rate is at least every millisecond +- scheduling jitter
* in the JVM.
*
*/
public TokenFiller(NBComponent parent, final RateSpec rateSpec, final ThreadDrivenTokenPool tokenPool, final NBLabeledElement labeled, final int hdrdigits) {
this.rateSpec = rateSpec;
this.tokenPool = tokenPool;
timer = parent.create().timer("tokenfiller",3);
}
public TokenFiller apply(final RateSpec rateSpec) {
this.rateSpec = rateSpec;
return this;
}
private void stop() {
running=false;
}
public TokenPool getTokenPool() {
return this.tokenPool;
}
@Override
public void run() {
this.lastRefillAt = System.nanoTime();
while (this.running) {
final long nextRefillTime = this.lastRefillAt + this.interval;
long thisRefillTime = System.nanoTime();
while (thisRefillTime < nextRefillTime) {
// while (thisRefillTime < lastRefillAt + interval) {
final long parkfor = Math.max(nextRefillTime - thisRefillTime, 0L);
// System.out.println(ANSI_Blue + " parking for " + parkfor + "ns" + ANSI_Reset); System.out.flush();
LockSupport.parkNanos(parkfor);
// System.out.println(ANSI_Blue + "unparking for " + parkfor + "ns" + ANSI_Reset); System.out.flush();
thisRefillTime = System.nanoTime();
}
// this.times[iteration]=thisRefillTime;
final long delta = thisRefillTime - this.lastRefillAt;
// this.amounts[iteration]=delta;
this.lastRefillAt = thisRefillTime;
// System.out.println(ANSI_Blue + this + ANSI_Reset); System.out.flush();
this.tokenPool.refill(delta);
this.timer.update(delta, TimeUnit.NANOSECONDS);
// iteration++;
}
}
public synchronized TokenFiller start() {
tokenPool.refill(this.rateSpec.getNanosPerOp());
this.thread = new Thread(this);
this.thread.setName(toString());
this.thread.setPriority(Thread.MAX_PRIORITY);
this.thread.setDaemon(true);
this.thread.start();
TokenFiller.logger.debug("Starting token filler thread: {}", this);
return this;
}
@Override
public String toString() {
return "TokenFiller spec=" + this.rateSpec + " interval=" + interval + "ns pool:" + this.tokenPool +" running=" + this.running;
}
// public String getRefillLog() {
// StringBuilder sb = new StringBuilder();
// for (int iter = 0; iter < iteration; iter++) {
// sb.append(times[iter]).append(" ").append(amounts[iter]).append("\n");
// }
// return sb.toString();
// }
public synchronized long restart() {
lastRefillAt=System.nanoTime();
TokenFiller.logger.debug("Restarting token filler at {} thread: {}", this.lastRefillAt, this);
final long wait = tokenPool.restart();
return wait;
}
}

View File

@ -1,44 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.labels.NBLabeledElement;
public interface TokenPool {
TokenPool apply(NBLabeledElement labeled, RateSpec rateSpec);
double getBurstRatio();
long takeUpTo(long amt);
long blockAndTake();
long blockAndTake(long tokens);
long getWaitTime();
long getWaitPool();
long getActivePool();
RateSpec getRateSpec();
long restart();
void start();
}

View File

@ -0,0 +1,329 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits.simrate;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.Duration;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
/**
* <H2>Invariants</H2>
* <UL>
* <LI>When filler is defined, the pool is being replenished, and the rate limiter is active.</LI>
* <LI>Any changes to filler state or filler actions must be guarded by the filler lock.</LI>
* <LI>Summary stats are accumulated when the filler is stopped.</LI>
* <LI>State is initialized when the filler is started.</LI>
* </UL>
* <p>
* In order to use {@link java.util.concurrent.Semaphore}, the canonical implementation which is designed to work
* best with virtual threads, we have to scale time so that the token bucket will fit within 2^31.
* To make this work across a range of rates from very slow to very fast, the resolution of time tracking has
* to be set according to the rate specified.
* <HR/>
* <P>Explanation:</P>
* <UL>
* <LI>The time divisor in the rate is properly established as <EM>per interval</EM>. Conventionally, a
* rate specified as "46Kops" is taken to mean "46Kops/s" or <EM>per second.</EM></LI>
* <LI>The time which passes on the wall clock is the inverse of this, or in the example above,
* <EM>1/46000</EM> of a second (21739 nanoseconds ideally).</LI>
* <LI>At lower rates, like 0.01 ops/s, a number of seconds must pass with time accumulating into the token pool.
* For 0.01/s, the number of nanoseconds representing a single op is 100_000_000_000, or more than 46 times
* the value which is representable in a 32 bit semaphore.</LI>
* <LI>By scaling the time unit, 0.01 ops/s can be represented as microseconds without losing significant timing
* resolution with respect to the rate.</LI>
* <LI>This scale factor works well to accommodate burst ratios up to 100%</LI>
* </UL>
*/
public class SimRate extends NBBaseComponent implements RateLimiter, Thread.UncaughtExceptionHandler {
private final static Logger logger = LogManager.getLogger(SimRate.class);
private final Semaphore activePool = new Semaphore(0);
private final AtomicLong waitingPool = new AtomicLong(0L);
private Thread filler;
private AtomicLong lastRefillAt = new AtomicLong(System.nanoTime());
private boolean running = true;
private long refillIntervalNanos = 1_000_000_0;
private int maxActivePool, burstPoolSize, maxOverActivePool, ticksPerOp;
private SimRateSpec spec;
private long blocks;
private final ReentrantLock fillerLock = new ReentrantLock(false);
private AtomicLong cumulativeWaitTimeTicks = new AtomicLong(0L);
private long startTime;
public SimRate(NBComponent parent, SimRateSpec spec) {
super(parent, NBLabels.forKV());
this.spec = spec;
startFiller();
}
private void startFiller() {
try {
applySpec(spec);
fillerLock.lock();
if (this.filler != null) {
logger.debug("filler already started, no changes");
return;
}
this.filler = new Thread(new FillerRunnable());
filler.setName("FILLER");
filler.setUncaughtExceptionHandler(this);
filler.start();
} finally {
fillerLock.unlock();
}
}
private void stopFiller() {
try {
fillerLock.lock();
if (filler == null) {
logger.debug("filler already stopped, no changes");
return;
}
running = false;
logger.trace("STARTED awaiting filler thread join");
filler.join();
logger.trace("FINISHED awaiting filler thread join");
filler = null;
accumulateStats();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
fillerLock.unlock();
}
}
public long refill() {
try {
fillerLock.lock();
// checkpoint delta
long now = System.nanoTime();
long newNanoTokens = now - lastRefillAt.get();
lastRefillAt.addAndGet(newNanoTokens);
long intOverFlowNanoTokens = (newNanoTokens - Integer.MAX_VALUE);
if (intOverFlowNanoTokens > 0) {
waitingPool.addAndGet(spec.nanosToTicks(intOverFlowNanoTokens));
newNanoTokens -= intOverFlowNanoTokens;
logger.warn(() -> "timer overflow with extra tokens=" + intOverFlowNanoTokens);
}
int newTokens = spec.nanosToTicks(newNanoTokens);
// We need between 0 and the amount of space left in the active pool, but never negative
final int needed = Math.max(this.maxActivePool - activePool.availablePermits(), 0);
// We put at most how many tokens we have, but never more than we need
final int allocatedToActivePool = Math.min(newTokens, needed);
// Actually make the adjustment to the active pool
this.activePool.release(allocatedToActivePool);
// overflow logic
// we have some tokens left over
final long newTokensLeftOver = newTokens - allocatedToActivePool;
// anything left over goes into the waiting pool
this.waitingPool.addAndGet(newTokensLeftOver);
// bursting logic (backfilling waiting pool into active pool)
// we can move some of the waiting pool (lost time) tokens to the active pool
// to provide bursting up to a limit, but the amount is normalized over time,
// using the active pool capacity as the denominator. This means that 1/4 of a
// second gets 1/4 of the burst and so on.
// We only want to apply burst according to the amount of time we have relative
// to how much time fits into one basic time unit.
final double refillFactor = Math.min((double) newTokens / this.maxActivePool, 1.0D);
int burstFillAllowed = (int) (refillFactor * this.burstPoolSize);
burstFillAllowed = Math.min(this.maxOverActivePool - this.activePool.availablePermits(), burstFillAllowed);
// we can only burst up to our burst limit, but only as much time as we have in the waiting pool already
final int burstFill = (int) Math.min(burstFillAllowed, this.waitingPool.get());
this.waitingPool.addAndGet(-burstFill);
this.activePool.release(burstFill);
// System.out.print(this);
// System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
// if (0 < newTokensLeftOver)
// System.out.print(ANSI_Red + " OVERFLOW:" + newTokensLeftOver + ANSI_Reset);
// if (0 < burstFill) System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
// if (intOverflowTokens>0) {
// System.out.println(ANSI_BrightYellow+ "OVERFLOW:"+intOverflowTokens + ANSI_Reset);
// }
// System.out.println();
long waiting = this.activePool.availablePermits() + this.waitingPool.get();
return waiting;
} finally {
fillerLock.unlock();
}
}
@Override
public void applyRateSpec(SimRateSpec updatingSimRateSpec) {
try {
fillerLock.lock();
if (null == updatingSimRateSpec) throw new RuntimeException("RateSpec must be defined");
if (updatingSimRateSpec.verb == SimRateSpec.Verb.stop || updatingSimRateSpec.verb == SimRateSpec.Verb.restart) {
if (filler != null) {
stopFiller();
}
}
applySpec(updatingSimRateSpec);
if (updatingSimRateSpec.verb == SimRateSpec.Verb.start || updatingSimRateSpec.verb == SimRateSpec.Verb.restart) {
if (filler == null) {
startFiller();
}
}
} finally {
fillerLock.unlock();
}
}
private void accumulateStats() {
this.cumulativeWaitTimeTicks.addAndGet(this.waitingPool.get());
}
@Override
public Duration getWaitTimeDuration(){
return Duration.of(waitingPool.get(), this.spec.unit);
}
@Override
public double getWaitTimeSeconds() {
Duration wait = getWaitTimeDuration();
return (double) wait.getSeconds() + (wait.getNano()/1_000_000_000d);
}
public void applySpec(SimRateSpec simRateSpec) {
ticksPerOp = simRateSpec.ticksPerOp();
maxActivePool = 1_000_000_000;
maxOverActivePool = (int) (this.maxActivePool * simRateSpec.burstRatio());
burstPoolSize = this.maxOverActivePool - this.maxActivePool;
this.startTime = System.nanoTime();
}
public long block() {
this.blocks++;
try {
this.activePool.acquire(ticksPerOp);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return this.waitingPool.get() + this.activePool.availablePermits();
}
@Override
public void uncaughtException(Thread t, Throwable e) {
logger.error(e);
}
@Override
public SimRateSpec getSpec() {
return spec;
}
private final class FillerRunnable implements Runnable {
private long fills = 0L;
private int charat = 0;
@Override
public void run() {
System.out.print("_");
while (SimRate.this.running) {
fills++;
// if ((fills%100)==0) {
// System.out.print(chars[charat++%chars.length]);
// System.out.print(ANSI_CURSOR_LEFT);
// System.out.flush();
// }
// logger.debug(() -> "refilling");
SimRate.this.refill();
LockSupport.parkNanos(refillIntervalNanos);
}
System.out.println("stopping filler");
}
}
@Override
public String toString() {
return String.format(
"{ active:%d, max:%d, fill:'(%,3.1f%%)A (%,3.1f%%)B', wait_ns:%,d, blocks:%,d lock:%s}",
this.activePool.availablePermits(), this.maxActivePool,
(double) this.activePool.availablePermits() / this.maxActivePool * 100.0,
(double) this.activePool.availablePermits() / this.maxOverActivePool * 100.0,
this.waitingPool.get(),
this.blocks,
this.fillerLock.isLocked() ? "LOCKED" : "UNLOCKED"
);
}
public <U, V> Function<U, V> wrap(Function<U, V> f) {
return new Wrapper<>(this, f);
}
public static class Wrapper<I, O> implements Function<I, O> {
private final Function<I, O> function;
private final SimRate ratelimiter;
public Wrapper(SimRate ratelimiter, Function<I, O> function) {
this.function = function;
this.ratelimiter = ratelimiter;
}
@Override
public O apply(I i) {
ratelimiter.block();
return function.apply(i);
}
}
@Override
public Duration getTotalWaitTimeDuration() {
Duration d1 = Duration.of(waitingPool.get(), this.spec.unit);
Duration d2 = Duration.of(cumulativeWaitTimeTicks.get(),this.spec.unit);
return d1.plus(d2);
}
@Override
public long getStartTime() {
return startTime;
}
}

View File

@ -14,13 +14,15 @@
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
package io.nosqlbench.engine.api.activityapi.ratelimits.simrate;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.api.engine.util.Unit;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import java.time.temporal.ChronoUnit;
/**
* <H2>Rate Limiter Specifications</H2>
*
@ -32,7 +34,7 @@ import org.apache.logging.log4j.LogManager;
* </P>
*
* <H2>Controlling Rate Limiters</H2>
*
* <p>
* Rate limiters specifiers can be easily constructed programmatically. However, in scripting,
* these will often be controlled by assigning a configuration string.
*
@ -45,14 +47,14 @@ import org.apache.logging.log4j.LogManager;
* <LI>&lt;rate&gt;,&lt;burst ratio&gt;</LI>
* <LI>&lt;rate&gt;,&lt;burst ratio&gt;,&lt;verb&gt;</LI>
* </UL>
*
* <p>
* Where:
*
* <EM>rate</EM> is the ops per second, expressed as any positive floating point value.
* <EM>burst ratio</EM> is a floating point value greater than 1.0 which determines how much faster
* the rate limiter may go to catch up to the overall.
* <EM>verb</EM> is one of configure, start, or restart, as explained below.
*
* <p>
* For example:
* <UL>
* <LI>200 - allow up to 200 ops to start per second, with the default burst ratio of 1.1.
@ -95,17 +97,40 @@ import org.apache.logging.log4j.LogManager;
* <DT>restart</DT>
* <DD>Restarting a rate limiter is the same as starting it initially. The only difference is that
* restarting forces a re-initialization as part of the configuration.</DD>
* <p>
* <HR/>
* <P>ChronoUnit Examples</P>
* <PRE>{@code
* <p>
* example ticks op/s seconds/op ticks/op
* -----------------------------------------------------------
* 1 ns 50 ops/s 0.02, 1/20 20_000_000 (1s of ns / 50)
* 2 ns 5 ops/s 0.2, 1/2 200_000_000 (1s of ns / 5)
* 3 ns 0.5 ops/s 2.0 2/1 2_000_000_000 (1s of ns / 0.5)
* 4 us 0.5 ops/s 2.0 2/1 2_000_000 (1s of us / 0.5)
* 5 ms 0.5 ops/s 2.0 2/1 2_000 (1s of ms / 0.5)
* }</PRE>
*
* </DL>
* <UL>
* <LI>In examples 1 and 2, the ticks/op are comfortably within the 2147483648 (2^31)
* range afforded by a 32-bit semaphore count.</LI>
* <LI>Example 3 shows where the value
* technically fits, but leaves no more than around 8% margin for the burst pool.
* This is insufficient as burst pool should be allowed to be up to 100% the size
* of the base rate pool.</LI>
* <LI>This is remedied in examples 4 and 5 by adjusting the unit of allocation
* to bigger chunks of time.</LI>
* </UL>
*/
public class RateSpec {
private final static Logger logger = LogManager.getLogger(RateSpec.class);
public class SimRateSpec {
private final static Logger logger = LogManager.getLogger(SimRateSpec.class);
public static final double DEFAULT_RATE_OPS_S = 1.0D;
public static final double DEFAULT_BURST_RATIO = 1.1D;
public static Verb DEFAULT_VERB = Verb.start;
public ChronoUnit unit;
/**
* Target rate in Operations Per Second
*/
@ -113,9 +138,36 @@ public class RateSpec {
public double burstRatio = DEFAULT_BURST_RATIO;
public Verb verb = Verb.start;
public double burstRatio() {
return this.burstRatio;
}
public int ticksPerOp() {
return switch (unit) {
case NANOS -> (int) (1_000_000_000d / opsPerSec);
case MICROS -> (int) (1_000_000d / opsPerSec);
case MILLIS -> (int) (1_000d / opsPerSec);
case SECONDS -> (int) (1d / opsPerSec);
default -> throw new RuntimeException("invalid ChronoUnit for rate spec:" + unit);
};
}
public int nanosToTicks(long newNanoTokens) {
if (newNanoTokens>Integer.MAX_VALUE) {
throw new RuntimeException("time base error with nanoseconds to ticks, value (" + newNanoTokens + ") is too large (>2^31!)");
}
return switch (unit) {
case NANOS -> (int) newNanoTokens;
case MICROS -> (int) (newNanoTokens/1_000L);
case MILLIS -> (int) (newNanoTokens/1_000_000L);
case SECONDS -> (int) (newNanoTokens/1_000_000_000L);
default -> throw new RuntimeException("invalid ChronoUnit for rate spec:" + unit);
};
}
/**
* Rate limiters can be put into motion in different modes to suit different scenarios. This is
* mostly to support advanced scripting capability. When the verb is not specified in a {@link RateSpec},
* mostly to support advanced scripting capability. When the verb is not specified in a {@link SimRateSpec},
* then it is started immediately as a user would expect.
*/
public enum Verb {
@ -143,28 +195,49 @@ public class RateSpec {
* target rates, where each iteration is independent of the others. In order to restart, a rate
* limiter will be configured if necessary.
*/
restart
restart,
stop
}
public RateSpec(double opsPerSec, double burstRatio) {
public SimRateSpec(double opsPerSec, double burstRatio) {
this(opsPerSec, burstRatio, DEFAULT_VERB);
}
public RateSpec(double opsPerSec, double burstRatio, Verb type) {
public SimRateSpec(double opsPerSec, double burstRatio, Verb type) {
apply(opsPerSec, burstRatio, verb);
}
private void apply(double opsPerSec, double burstRatio, Verb verb) {
this.opsPerSec = opsPerSec;
this.burstRatio = burstRatio;
this.verb = type;
this.verb = verb;
this.unit = chronoUnitFor(opsPerSec);
// TODO: include burst into ticks calculation
}
public RateSpec(ParameterMap.NamedParameter tuple) {
this(tuple.value);
if (tuple.name.startsWith("co_")) {
logger.warn("The co_ prefix on " + tuple.name + " is no longer needed. All rate limiters now provide standard coordinated omission metrics.");
private ChronoUnit chronoUnitFor(double opsPerSec) {
if (opsPerSec > 1.0d) {
return ChronoUnit.NANOS;
}
if (opsPerSec > 0.001d) {
return ChronoUnit.MICROS;
}
if (opsPerSec > 0.000001d) {
return ChronoUnit.MILLIS;
}
return ChronoUnit.SECONDS;
}
public RateSpec(String spec) {
public SimRateSpec(ParameterMap.NamedParameter tuple) {
this(tuple.value);
}
public SimRateSpec(String spec) {
String[] specs = spec.split("[,:;]");
Verb verb = Verb.start;
double burstRatio = DEFAULT_BURST_RATIO;
double opsPerSec;
switch (specs.length) {
case 3:
verb = Verb.valueOf(specs[2].toLowerCase());
@ -180,6 +253,7 @@ public class RateSpec {
default:
throw new RuntimeException("Rate specs must be either '<rate>' or '<rate>:<burstRatio>' as in 5000.0 or 5000.0:1.0");
}
apply(opsPerSec, burstRatio, verb);
}
public String toString() {
@ -193,23 +267,19 @@ public class RateSpec {
return String.format("{ rate:'%s', burstRatio:'%.3f', SOPSS:'%s', BOPSS:'%s', verb:'%s' }", ratefmt, burstRatio, ratefmt, burstfmt, verb);
}
public RateSpec withOpsPerSecond(double rate) {
return new RateSpec(rate, this.burstRatio);
public SimRateSpec withOpsPerSecond(double rate) {
return new SimRateSpec(rate, this.burstRatio);
}
public RateSpec withBurstRatio(double burstRatio) {
return new RateSpec(this.opsPerSec, burstRatio);
public SimRateSpec withBurstRatio(double burstRatio) {
return new SimRateSpec(this.opsPerSec, burstRatio);
}
public RateSpec withVerb(Verb verb) {
return new RateSpec(this.opsPerSec, this.burstRatio, verb);
public SimRateSpec withVerb(Verb verb) {
return new SimRateSpec(this.opsPerSec, this.burstRatio, verb);
}
public long getNanosPerOp() {
return (long) (1E9 / opsPerSec);
}
public double getRate() {
return this.opsPerSec;
}
@ -223,10 +293,10 @@ public class RateSpec {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
RateSpec rateSpec = (RateSpec) o;
SimRateSpec simRateSpec = (SimRateSpec) o;
if (Double.compare(rateSpec.opsPerSec, opsPerSec) != 0) return false;
return Double.compare(rateSpec.burstRatio, burstRatio) == 0;
if (Double.compare(simRateSpec.opsPerSec, opsPerSec) != 0) return false;
return Double.compare(simRateSpec.burstRatio, burstRatio) == 0;
}
@Override

View File

@ -0,0 +1,96 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits.simrate;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
public class SimThreads {
private final Semaphore semaphore = new Semaphore(Integer.MAX_VALUE, true) {{
try {
acquire(Integer.MAX_VALUE);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}};
private boolean running = true;
private Spec params = new Spec(0);
public SimThreads(Spec params) {
applySpec(params);
}
SimThreads(String concurrency) {
this(new Spec(Integer.parseInt(concurrency)));
}
public SimThreads applySpec(Spec newSpec) {
if (newSpec.equals(this.params)) {
return this;
}
int effectiveConcurrency = this.params.concurrency();
// releasing slots is uncontended
while (newSpec.concurrency() > effectiveConcurrency) {
int diff = newSpec.concurrency() - effectiveConcurrency;
semaphore.release(diff);
effectiveConcurrency+=diff;
}
// acquiring (locking out) slots is contended
while (newSpec.concurrency() < effectiveConcurrency) {
try {
semaphore.acquire();
effectiveConcurrency+=1;
} catch (InterruptedException ignored) {
}
}
this.params = newSpec;
return this;
}
public <U,V> Function<U,V> wrap(Function<U,V> f) {
return new Wrapper<>(this, f);
}
public static record Spec(int concurrency) {
}
public static class Wrapper<I,O> implements Function<I,O> {
private final Function<I, O> function;
private final SimThreads simThreads;
public Wrapper(SimThreads simThreads, Function<I,O> function) {
this.function = function;
this.simThreads = simThreads;
}
@Override
public O apply(I i) {
try {
simThreads.semaphore.acquire();
return function.apply(i);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
simThreads.semaphore.release();
}
}
}
@Override
public String toString() {
return "concurrency " + (this.params.concurrency-this.semaphore.availablePermits()) + " / " + this.params.concurrency;
}
}

View File

@ -26,7 +26,7 @@ import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateSpec;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.api.labels.NBLabels;
@ -305,12 +305,12 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
activityDef.getParams().getOptionalNamedParameter("striderate")
.map(RateSpec::new)
.ifPresent(spec -> strideLimiter = RateLimiters.createOrUpdate(this, "strides", strideLimiter, spec));
.map(SimRateSpec::new)
.ifPresent(spec -> strideLimiter = RateLimiters.createOrUpdate(this, strideLimiter, spec));
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate")
.map(RateSpec::new).ifPresent(
spec -> cycleLimiter = RateLimiters.createOrUpdate(this, "cycles", cycleLimiter, spec));
.map(SimRateSpec::new).ifPresent(
spec -> cycleLimiter = RateLimiters.createOrUpdate(this, cycleLimiter, spec));
}

View File

@ -212,7 +212,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
if (strideRateLimiter != null) {
// block for strides rate limiter
strideRateLimiter.start();
strideRateLimiter.block();
}
long strideDelay = 0L;
@ -251,7 +251,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
if (strideRateLimiter != null) {
// block for strides rate limiter
strideDelay = strideRateLimiter.maybeWaitForOp();
strideDelay = strideRateLimiter.block();
}
StrideTracker<D> strideTracker = new StrideTracker<>(
@ -283,7 +283,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
if (cycleRateLimiter != null) {
// Block for cycle rate limiter
cycleDelay = cycleRateLimiter.maybeWaitForOp();
cycleDelay = cycleRateLimiter.block();
}
try {
@ -354,7 +354,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
if (strideRateLimiter != null) {
// block for strides rate limiter
strideDelay = strideRateLimiter.maybeWaitForOp();
strideDelay = strideRateLimiter.block();
}
long strideStart = System.nanoTime();
@ -378,7 +378,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
if (cycleRateLimiter != null) {
// Block for cycle rate limiter
cycleDelay = cycleRateLimiter.maybeWaitForOp();
cycleDelay = cycleRateLimiter.block();
}
long cycleStart = System.nanoTime();

View File

@ -1,339 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.engine.api.util.Colors;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.api.engine.metrics.DeltaHdrHistogramReservoir;
import io.nosqlbench.api.testutils.Bounds;
import io.nosqlbench.api.testutils.Perf;
import io.nosqlbench.api.testutils.Result;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
public class RateLimiterPerfTestMethods {
// public Perf testFindOverheads(RateLimiter rl) {
// List<Result> results = new ArrayList<>();
// Perf perf = new Perf("perf tests for " + rl);
// perf.add(rateLimiterSingleThreadedConvergence(rl));
//// perf.add(systemTimeOverhead(rl));
//// perf.add(conditionOverhead());
// return perf;
// }
public Result systemTimeOverhead(final RateLimiter rl) {
final Bounds bounds = new Bounds(1000, 2);
final Perf perf = new Perf("nanotime");
while (!perf.isConverged(Result::getOpsPerSec, 0.01d, 3)) {
System.out.println("testing with opcount=" + bounds.getNextValue());
final long start = System.nanoTime();
for (long iter = 0; iter < bounds.getValue(); iter++) {
final long result = System.nanoTime();
}
final long end = System.nanoTime();
perf.add("nanotime/" + bounds.getValue(), start, end, bounds.getValue());
}
final double[] deltas = perf.getDeltas(Result::getOpsPerSec);
return perf.getLastResult();
}
public Result rateLimiterSingleThreadedConvergence(final Function<RateSpec, RateLimiter> rlf, final RateSpec rs, final long startingCycles, final double margin) {
//rl.applyRateSpec(rl.getRateSpec().withOpsPerSecond(1E9));
final Bounds bounds = new Bounds(startingCycles, 2);
final Perf perf = new Perf("nanotime");
while (!perf.isConverged(Result::getOpsPerSec, margin, 3)) {
System.out.println("testing with opcount=" + bounds.getNextValue() + " spec=" + rs);
final RateLimiter rl = rlf.apply(rs);
final long start = System.nanoTime();
for (long iter = 0; iter < bounds.getValue(); iter++) {
final long result = rl.maybeWaitForOp();
}
final long end = System.nanoTime();
perf.add("rl/" + bounds.getValue(), start, end, bounds.getValue());
System.out.println(perf.getLastResult());
}
return perf.getLastResult();
}
/**
* This test method will call {@link RateLimiter#maybeWaitForOp()} on a rate limiter with a sequence of different
* getOpsPerSec specifiers. For each 4-tuple in the second varargs argument, the following fields
* are used to control how the getOpsPerSec limiter is configured and called:
*
* <OL>
* <LI>count - how many times to call maybeWaitForOp</LI>
* <LI>getOpsPerSec - the getOpsPerSec to set the getOpsPerSec limiter to</LI>
* <LI>divisions - the number of sub-segments to iterate and record</LI>
* <LI>clientrate - the artificially limited client getOpsPerSec</LI>
* </OL>
*
* @param count_rate_division_clientrate
* @return
*/
long[] testRateChanges(final RateLimiter rl, final int... count_rate_division_clientrate) {
System.out.println("Running " + Thread.currentThread().getStackTrace()[1].getMethodName());
final List<Long> results = new ArrayList<>();
for (int idx = 0; idx < count_rate_division_clientrate.length; idx += 4) {
final int count = count_rate_division_clientrate[idx];
final int rate = count_rate_division_clientrate[idx + 1];
final int divisions = count_rate_division_clientrate[idx + 2];
final int clientrate = count_rate_division_clientrate[idx + 3];
final long clientnanos = (long) (1_000_000_000.0D / clientrate);
if (rl instanceof DiagUpdateRate) {
((DiagUpdateRate) rl).setDiagModulo(count / divisions);
System.out.println("updating every " + count / divisions + " calls (" + count + '/' + divisions + ')');
}
System.out.println("count=" + count + ", getOpsPerSec=" + rate + ", div=" + divisions + ", clientrate=" + clientrate);
System.out.println("client nanos: " + clientnanos);
final long startAt = System.nanoTime();
rl.applyRateSpec(rl.getRateSpec().withOpsPerSecond(rate));
final int perDivision = count / divisions;
long divDelay = 0L;
for (int div = 0; div < divisions; div++) {
long then = System.nanoTime();
for (int i = 0; i < perDivision; i++) {
then += clientnanos;
rl.maybeWaitForOp();
while (System.nanoTime() < then) {
}
}
divDelay = rl.maybeWaitForOp();
results.add(divDelay);
}
final long endAt = System.nanoTime();
final double duration = (endAt - startAt) / 1000000000.0d;
final double acqops = count / duration;
System.out.println(rl);
System.out.println(Colors.ANSI_Blue +
String.format(
"spec: %s\n count: %9d, duration %.5fS, acquires/s %.3f, nanos/op: %f\n delay: %d (%.5fS)",
rl.getRateSpec(),
count, duration, acqops, 1_000_000_000.0d / acqops, divDelay, divDelay / 1_000_000_000.0d) +
Colors.ANSI_Reset);
}
final long[] delays = results.stream().mapToLong(Long::longValue).toArray();
final String delaySummary = Arrays.stream(delays).mapToDouble(d -> d / 1_000_000_000.0D).mapToObj(d -> String.format("%.3f", d))
.collect(Collectors.joining(","));
System.out.println("delays in seconds:\n" + delaySummary);
System.out.println("delays in ns:\n" + Arrays.toString(delays));
return delays;
}
public Result rateLimiterContendedConvergence(final int threads, final Function<RateSpec, RateLimiter> rlFunc, final RateSpec rateSpec, final int initialIterations, final double margin) {
final Bounds bounds = new Bounds(initialIterations, 2);
final Perf perf = new Perf("contended with " + threads + " threads");
while (!perf.isConverged(Result::getOpsPerSec, margin, 3)) {
final Perf delegateperf = this.testRateLimiterMultiThreadedContention(rlFunc, rateSpec, initialIterations, threads);
perf.add(delegateperf.getLastResult());
}
return perf.getLastResult();
}
/**
* This a low-overhead test for multi-threaded access to the same getOpsPerSec limiter. It calculates the
* effective concurrent getOpsPerSec under atomic contention.
*/
public Perf testRateLimiterMultiThreadedContention(final Function<RateSpec, RateLimiter> rlFunc, final RateSpec spec, final long iterations, final int threadCount) {
System.out.println("Running " + Thread.currentThread().getStackTrace()[1].getMethodName());
final RateLimiter rl = rlFunc.apply(spec);
final double rate = spec.getRate();
final int iterationsPerThread = (int) (iterations / threadCount);
if (Integer.MAX_VALUE <= iterationsPerThread)
throw new RuntimeException("iterations per thread too high with (count,threads)=(" + iterations + ',' + threadCount);
final TestExceptionHandler errorhandler = new TestExceptionHandler();
final TestThreadFactory threadFactory = new TestThreadFactory(errorhandler);
final ExecutorService tp = Executors.newFixedThreadPool(threadCount + 1, threadFactory);
System.out.format("Running %,d iterations split over %,d threads (%,d per) at %,.3f ops/s\n", iterations, threadCount, iterations / threadCount, rate);
final Acquirer[] threads = new Acquirer[threadCount];
final DeltaHdrHistogramReservoir stats = new DeltaHdrHistogramReservoir(NBLabels.forKV("name", "times"), 5);
final CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
final RateLimiterStarter starter = new RateLimiterStarter(barrier, rl);
// threads[i] = new RateLimiterPerfTestMethods.Acquirer(i, rl, (int) (iterations / threadCount), stats, barrier);
for (int i = 0; i < threadCount; i++) threads[i] = new Acquirer(i, rl, iterationsPerThread, stats, barrier);
tp.execute(starter);
System.out.println(rl);
System.out.format("submitting (%d threads)...\n", threads.length);
final List<Future<Result>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) futures.add(tp.submit((Callable<Result>) threads[i]));
System.out.format("submitted (%d threads)...\n", threads.length);
try {
tp.shutdown();
if (!tp.awaitTermination(1000, TimeUnit.SECONDS))
throw new RuntimeException("Failed to shutdown thread pool.");
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
errorhandler.throwIfAny();
System.out.println(rl);
final Perf aggregatePerf = new Perf("contended with " + threadCount + " threads for " + iterations + " iterations for " + rl.getRateSpec().toString());
futures.stream().map(f -> {
try {
return f.get();
} catch (final Exception e) {
throw new RuntimeException(e);
}
}).forEachOrdered(aggregatePerf::add);
// System.out.println(aggregatePerf);
// if (rl instanceof HybridRateLimiter) {
// String refillLog = ((HybridRateLimiter) rl).getRefillLog();
// System.out.println("refill log:\n" + refillLog);
// }
final Perf perf = aggregatePerf.reduceConcurrent();
return perf;
}
private static class RateLimiterStarter implements Runnable {
private final CyclicBarrier barrier;
private final RateLimiter rl;
public RateLimiterStarter(final CyclicBarrier barrier, final RateLimiter rl) {
this.barrier = barrier;
this.rl = rl;
}
@Override
public void run() {
try {
// System.out.println("awaiting barrier (starter) (" + barrier.getNumberWaiting() + " awaiting)");
this.barrier.await(60, TimeUnit.SECONDS);
// System.out.println("started the rate limiter (starter) (" + barrier.getNumberWaiting() + " awaiting)");
} catch (final Exception e) {
throw new RuntimeException(e);
}
this.rl.start();
}
}
private static class TestExceptionHandler implements UncaughtExceptionHandler {
public List<Throwable> throwables = new ArrayList<>();
public List<Thread> threads = new ArrayList<>();
@Override
public void uncaughtException(final Thread t, final Throwable e) {
this.threads.add(t);
this.throwables.add(e);
System.out.println("uncaught exception on thread " + t.getName() + ": " + e.toString());
}
public void throwIfAny() {
if (0 < throwables.size()) throw new RuntimeException(this.throwables.get(0));
}
}
private static class Acquirer implements Callable<Result>, Runnable {
private final RateLimiter limiter;
private final int threadIdx;
private final DeltaHdrHistogramReservoir reservoir;
private final CyclicBarrier barrier;
private final long iterations;
public Acquirer(final int i, final RateLimiter limiter, final int iterations, final DeltaHdrHistogramReservoir reservoir, final CyclicBarrier barrier) {
threadIdx = i;
this.limiter = limiter;
this.iterations = iterations;
this.reservoir = reservoir;
this.barrier = barrier;
}
@Override
public Result call() {
// synchronized (barrier) {
try {
if (0 == this.threadIdx) System.out.println("awaiting barrier");
this.barrier.await(60, TimeUnit.SECONDS);
if (0 == this.threadIdx) System.out.println("starting all threads");
} catch (final Exception be) {
throw new RuntimeException(be); // This should not happen unless the test is broken
}
// }
final long startTime = System.nanoTime();
for (int i = 0; i < this.iterations; i++) {
final long time = this.limiter.maybeWaitForOp();
}
final long endTime = System.nanoTime();
return new Result("thread " + threadIdx, startTime, endTime, this.iterations);
}
@Override
public void run() {
for (int i = 0; i < this.iterations; i++) this.limiter.maybeWaitForOp();
}
}
private static class TestThreadFactory implements ThreadFactory {
private final UncaughtExceptionHandler handler;
public TestThreadFactory(final UncaughtExceptionHandler uceh) {
handler = uceh;
}
@Override
public Thread newThread(final Runnable r) {
final Thread t = new Thread(r);
t.setUncaughtExceptionHandler(this.handler);
return t;
}
}
}

View File

@ -16,33 +16,34 @@
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class RateSpecTest {
public class SimRateSpecTest {
@Test
public void testDefaultRateSpecPattern() {
RateSpec r = new RateSpec("523");
SimRateSpec r = new SimRateSpec("523");
assertThat(r.getRate()).isEqualTo(523.0d);
assertThat(r.getBurstRatio()).isEqualTo(1.1d);
}
@Test
public void testBurstRatioPattern() {
RateSpec r = new RateSpec("12345,1.3");
SimRateSpec r = new SimRateSpec("12345,1.3");
assertThat(r.getRate()).isEqualTo(12345.0d);
assertThat(r.getBurstRatio()).isEqualTo(1.3d);
}
@Test
public void testTypeSelection() {
RateSpec a = new RateSpec("12345,1.4,configure");
assertThat(a.getVerb()).isEqualTo(RateSpec.Verb.configure);
RateSpec d = new RateSpec("12345,1.4,restart");
assertThat(d.verb).isEqualTo(RateSpec.Verb.restart);
RateSpec c = new RateSpec("12345,1.1");
assertThat(c.verb).isEqualTo(RateSpec.Verb.start);
SimRateSpec a = new SimRateSpec("12345,1.4,configure");
assertThat(a.getVerb()).isEqualTo(SimRateSpec.Verb.configure);
SimRateSpec d = new SimRateSpec("12345,1.4,restart");
assertThat(d.verb).isEqualTo(SimRateSpec.Verb.restart);
SimRateSpec c = new SimRateSpec("12345,1.1");
assertThat(c.verb).isEqualTo(SimRateSpec.Verb.start);
}
}

View File

@ -1,185 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.testutils.Perf;
import io.nosqlbench.api.testutils.Result;
import io.nosqlbench.components.NBBaseComponent;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.function.Function;
public class TestHybridRateLimiterPerf {
private final Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(NBBaseComponent.EMPTY_COMPONENT,"hybrid", rs.withVerb(RateSpec.Verb.start));
private final RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
@Test
@Disabled
public void testPerf1e9() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E9, 1.1),10_000_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e8() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E8, 1.1),50_000_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e7() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E7, 1.1),5_000_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e6() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E6, 1.1),500_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e5() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E5, 1.1),50_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e4() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E4, 1.1),5_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e3() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E3, 1.1),500,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e2() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E2, 1.1),50,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e1() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E1, 1.1),5,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e0() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E0, 1.1),2,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testePerf1eN1() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E-1, 1.1),1,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void test100Mops_160threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,160);
System.out.println(perf.getLastResult());
}
@Test
@Disabled
public void test100Mops_80threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,80);
System.out.println(perf.getLastResult());
}
// 40 threads at 100_000_000 ops/s
// JVM 1.8.0_152
// 400000000_ops 29.819737_S 13413934.327_ops_s, 75_ns_op
// 800000000_ops 60.616158_S 13197801.155_ops_s, 76_ns_op
// JVM 11.0.1
// 400000000_ops 33.622751_S 11896706.363_ops_s, 84_ns_op
@Test
@Disabled
public void test100Mops_40threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,40);
System.out.println(perf.getLastResult());
}
// 20 threads at 100_000_000 ops/s
// JVM 1.8.0_152
// 200000000_ops 14.031716_S 14253424.087_ops_s, 70_ns_op
// 400000000_ops 35.918071_S 11136455.474_ops_s, 90_ns_op
// 400000000_ops 30.809579_S 12982975.401_ops_s, 77_ns_op
// 400000000_ops 36.985547_S 10815035.410_ops_s, 92_ns_op
// 200000000_ops 16.843876_S 11873751.403_ops_s, 84_ns_op
// 200000000_ops 17.382563_S 11505783.253_ops_s, 87_ns_op
// JVM 11.0.1
// 200000000_ops 12.247201_S 16330261.978_ops_s, 61_ns_op
// 200000000_ops 15.915484_S 12566379.106_ops_s, 80_ns_op
// 200000000_ops 17.691698_S 11304737.461_ops_s, 88_ns_op
@Test
@Disabled
public void test100Mops_20threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,20);
System.out.println(perf.getLastResult());
}
// 10 threads at 100_000_000 ops/s
// JVM 1.8.0_152
// 100000000_ops 5.369642_S 18623216.864_ops_s, 54_ns_op
// 200000000_ops 16.744912_S 11943926.287_ops_s, 84_ns_op
// 200000000_ops 17.474475_S 11445264.894_ops_s, 87_ns_op
// 200000000_ops 14.089247_S 14195222.897_ops_s, 70_ns_op
@Test
@Disabled
public void test100Mops_10threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 10_000_000,10);
System.out.println(perf.getLastResult());
}
// 5 threads at 100_000_000 ops/s
// JVM 1.8.0_152
// 50000000_ops 2.477219_S 20183923.068_ops_s, 50_ns_op
// 200000000_ops 10.422393_S 19189451.478_ops_s, 52_ns_op
// 200000000_ops 10.624822_S 18823844.646_ops_s, 53_ns_op
// JVM 11.0.1
// 200000000_ops 11.839666_S 16892368.438_ops_s, 59_ns_op
@Test
@Disabled
public void test100Mops_5threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 40_000_000,5);
System.out.println(perf.getLastResult());
}
}

View File

@ -1,108 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.testutils.Perf;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.function.Function;
/**
* These tests are for sanity checking rate limiter implementations. They are not enabled by default,
* since they are very CPU an time intensive. If you are developing rate limiters, use these to understand
* throughput variations at different speeds and different levels of contention.
*
* This set is for 10M ops/s at different levels of contention.
*/
public class TestRateLimiterPerf1E7 {
private final Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(new TestComponent("alias","tokenrl"),"hybrid", rs.withVerb(RateSpec.Verb.configure));
private final RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
// 160 threads at 10_000_000 ops/s
// JVM 11.0.1
// 160000000_ops 18.122886_S 8828615.971_ops_s, 113_ns_op
@Test
@Disabled
public void test10Mops_160threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,160);
System.out.println(perf.getLastResult());
}
// 80 threads at 10_000_000 ops/s
// JVM 11.0.1
// 80000000_ops 8.354648_S 9575507.945_ops_s, 104_ns_op
@Test
@Disabled
public void test10Mops_80threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,80);
System.out.println(perf.getLastResult());
}
// 40 threads at 10_000_000 ops/s
// JVM 11.0.1
// 40000000_ops 4.001661_S 9995849.116_ops_s, 100_ns_op
@Test
@Disabled
public void test10Mops_40threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,40);
System.out.println(perf.getLastResult());
}
// 20 threads at 10_000_000 ops/s
// JVM 11.0.1
// 20000000_ops 1.914366_S 10447323.063_ops_s, 96_ns_op
@Test
@Disabled
public void test10Mops_20threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 10), 20_000_000,20);
System.out.println(perf.getLastResult());
}
// 10 threads at 10_000_000 ops/s
// JVM 11.0.1
// 10000000_ops 0.962764_S 10386764.060_ops_s, 96_ns_op
// 100000000_ops 9.842758_S 10159754.498_ops_s, 98_ns_op
// 100000000_ops 10.123873_S 9877642.338_ops_s, 101_ns_op
// 100000000_ops 10.078673_S 9921941.517_ops_s, 101_ns_op
@Test
@Disabled
public void test10Mops_10threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,10);
System.out.println(perf.getLastResult());
}
// 5 threads at 10_000_000 ops/s
// JVM 11.0.1
// 50000000_ops 4.804698_S 10406482.168_ops_s, 96_ns_op
// 50000000_ops 4.923481_S 10155416.143_ops_s, 98_ns_op
// 50000000_ops 4.924924_S 10152441.416_ops_s, 98_ns_op
// 50000000_ops 4.924924_S 10152441.416_ops_s, 98_ns_op
// 200000000_ops 19.761154_S 10120866.172_ops_s, 99_ns_op
// 200000000_ops 19.928625_S 10035815.505_ops_s, 100_ns_op
@Test
@Disabled
public void test10Mops_5threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E7, 1.1), 20_000_000,5);
System.out.println(perf.getLastResult());
}
}

View File

@ -1,190 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.testutils.Perf;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.function.Function;
/**
* These tests are for sanity checking rate limiter implementations. They are not enabled by default,
* since they are very CPU an time intensive. If you are developing rate limiters, use these to understand
* throughput variations at different speeds and different levels of contention.
*
* This set is for 100M ops/s at different levels of contention.
*/
public class TestRateLimiterPerf1E8 {
NBLabeledElement def = NBLabeledElement.forKV("alias","tokenrl");
private final Function<RateSpec, RateLimiter> rlFunction =
rs -> new HybridRateLimiter(
new TestComponent("test","rltest"),
"hybrid",
rs.withVerb(RateSpec.Verb.configure)
);
private final RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
@Test
@Disabled
public void test100Mops_4000threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
4000
);
System.out.println(perf.getLastResult());
}
@Test
@Disabled
public void test100Mops_2000threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
2000
);
System.out.println(perf.getLastResult());
}
@Test
@Disabled
public void test100Mops_1000threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
1000
);
System.out.println(perf.getLastResult());
}
@Test
@Disabled
public void test100Mops_320threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
320
);
System.out.println(perf.getLastResult());
}
// 160 threads at 100_000_000 ops/s
// 1600000000_ops 149.351811_S 10712960.186_ops_s, 93_ns_op
// JVM 11.0.1, Intel(R) Core(TM) i7-4790 CPU @ 3.60GHz
// 1600000000_ops 160.319831_S 9_980_050.444_ops_s, 100_ns_op
// 1600000000_ops 159.234501_S 10_048_073.673_ops_s, 100_ns_op
// 1600000000_ops 158.224286_S 10_112_227.620_ops_s, 99_ns_op
//
@Test
@Disabled
public void test100Mops_160threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(
this.rlFunction,
new RateSpec(1.0E8, 1.1),
100_000_000,
160
);
System.out.println(perf.getLastResult());
}
// 80 threads at 100_000_000 ops/s
// JVM 11.0.1, Intel(R) Core(TM) i7-4790 CPU @ 3.60GHz
// 800000000_ops 74.104295_S 10795595.534_ops_s, 93_ns_op
// 800000000_ops 74.155495_S 10788141.933_ops_s, 93_ns_op
@Test
@Disabled
public void test100Mops_80threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 80);
System.out.println(perf.getLastResult());
}
// 40 threads at 100_000_000 ops/s
// JVM 1.8.0_152
// 400000000_ops 29.819737_S 13413934.327_ops_s, 75_ns_op
// 800000000_ops 60.616158_S 13197801.155_ops_s, 76_ns_op
// JVM 11.0.1, Intel(R) Core(TM) i7-4790 CPU @ 3.60GHz
// 400000000_ops 33.622751_S 11896706.363_ops_s, 84_ns_op
@Test
@Disabled
public void test100Mops_40threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 40);
System.out.println(perf.getLastResult());
}
// 20 threads at 100_000_000 ops/s
// JVM 1.8.0_152
// 200000000_ops 14.031716_S 14253424.087_ops_s, 70_ns_op
// 400000000_ops 35.918071_S 11136455.474_ops_s, 90_ns_op
// 400000000_ops 30.809579_S 12982975.401_ops_s, 77_ns_op
// 400000000_ops 36.985547_S 10815035.410_ops_s, 92_ns_op
// 200000000_ops 16.843876_S 11873751.403_ops_s, 84_ns_op
// 200000000_ops 17.382563_S 11505783.253_ops_s, 87_ns_op
// JVM 11.0.1, Intel(R) Core(TM) i7-4790 CPU @ 3.60GHz
// 200000000_ops 12.247201_S 16330261.978_ops_s, 61_ns_op
// 200000000_ops 15.915484_S 12566379.106_ops_s, 80_ns_op
// 200000000_ops 17.691698_S 11304737.461_ops_s, 88_ns_op
@Test
@Disabled
public void test100Mops_20threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 20);
System.out.println(perf.getLastResult());
}
// 10 threads at 100_000_000 ops/s
// JVM 1.8.0_152
// 100000000_ops 5.369642_S 18623216.864_ops_s, 54_ns_op
// 200000000_ops 16.744912_S 11943926.287_ops_s, 84_ns_op
// 200000000_ops 17.474475_S 11445264.894_ops_s, 87_ns_op
// 200000000_ops 14.089247_S 14195222.897_ops_s, 70_ns_op
// JVM 11.0.1, Intel(R) Core(TM) i7-4790 CPU @ 3.60GHz
// 100000000_ops 7.751758_S 12900299.587_ops_s, 78_ns_op
// 100000000_ops 7.864851_S 12714799.657_ops_s, 79_ns_op
@Test
@Disabled
public void test100Mops_10threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 10);
System.out.println(perf.getLastResult());
}
// 5 threads at 100_000_000 ops/s
// JVM 1.8.0_152
// 50000000_ops 2.477219_S 20183923.068_ops_s, 50_ns_op
// 200000000_ops 10.422393_S 19189451.478_ops_s, 52_ns_op
// 200000000_ops 10.624822_S 18823844.646_ops_s, 53_ns_op
// JVM 11.0.1, Intel(R) Core(TM) i7-4790 CPU @ 3.60GHz
// 200000000_ops 11.839666_S 16892368.438_ops_s, 59_ns_op
// 50000000_ops 2.390485_S 20916254.150_ops_s, 48_ns_op
// 100000000_ops 6.317008_S 15830279.182_ops_s, 63_ns_op
// 200000000_ops 13.551712_S 14758282.931_ops_s, 68_ns_op
@Test
@Disabled
public void test100Mops_5threads() {
final Perf perf = this.methods.testRateLimiterMultiThreadedContention(this.rlFunction, new RateSpec(1.0E8, 1.1), 100_000_000, 5);
System.out.println(perf.getLastResult());
}
}

View File

@ -1,117 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.testutils.Result;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.util.function.Function;
/**
* These tests are for sanity checking rate limiter implementations. They are not enabled by default,
* since they are very CPU an time intensive. If you are developing rate limiters, use these to understand
* throughput variations at different speeds and different levels of contention.
*
* This set is for single-threaded (uncontended) baselines, at different op rates.
*/
public class TestRateLimiterPerfSingle {
private final Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(new TestComponent("alias","tokenrl"),"hybrid", rs.withVerb(RateSpec.Verb.start));
private final RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
@Test
@Disabled
public void testPerf1e9() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E9, 1.1),10_000_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e8() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E8, 1.1),50_000_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e7() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E7, 1.1),5_000_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e6() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E6, 1.1),500_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e5() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E5, 1.1),50_000,0.01d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e4() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E4, 1.1),5_000,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e3() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E3, 1.1),500,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e2() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E2, 1.1),50,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e1() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E1, 1.1),5,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testPerf1e0() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E0, 1.1),2,0.005d);
System.out.println(result);
}
@Test
@Disabled
public void testePerf1eN1() {
final Result result = this.methods.rateLimiterSingleThreadedConvergence(this.rlFunction,new RateSpec(1.0E-1, 1.1),1,0.005d);
System.out.println(result);
}
}

View File

@ -1,53 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.components.NBComponent;
import java.util.concurrent.atomic.AtomicLong;
public class TestableHybridRateLimiter extends HybridRateLimiter {
private final AtomicLong clock;
private final static NBComponent parent = new TestComponent("rlparent","rlparent");
public TestableHybridRateLimiter(final AtomicLong clock, final RateSpec rateSpec, final NBLabeledElement def) {
super(parent, "test", rateSpec);
this.applyRateSpec(rateSpec);
this.setLabel("test");
this.clock = clock;
this.init(def);
}
public long setClock(final long newValue) {
final long oldValue = this.clock.get();
this.clock.set(newValue);
return oldValue;
}
public long getClock() {
return this.clock.get();
}
@Override
protected long getNanoClockTime() {
return this.clock.get();
}
}

View File

@ -1,23 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
public interface TestableRateLimiter extends RateLimiter {
long setClock(long newValue);
long getClock();
}

View File

@ -1,78 +0,0 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.ratelimits;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.api.labels.NBLabeledElement;
import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
import org.junit.jupiter.api.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class TokenPoolTest {
ActivityDef adef = new ActivityDef(ParameterMap.parseOrException("alias=testing"));
NBLabeledElement def = NBLabeledElement.forMap(this.adef.getParams().getStringStringMap());
TestComponent component = new TestComponent("test","component");
@Test
public void testBackfillFullRate() {
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(component,new RateSpec(10000000, 1.1), this.def);
assertThat(p.refill(1000000L)).isEqualTo(1000000L);
assertThat(p.getWaitPool()).isEqualTo(0L);
assertThat(p.refill(100L)).isEqualTo(1000100);
assertThat(p.getWaitPool()).isEqualTo(90L);
assertThat(p.refill(10L)).isEqualTo(1000110L);
assertThat(p.getWaitPool()).isEqualTo(99L);
assertThat(p.refill(10)).isEqualTo(1000120L);
assertThat(p.takeUpTo(100)).isEqualTo(100L);
}
@Test
public void testTakeRanges() {
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(component,new RateSpec(100, 10), def);
p.refill(100);
assertThat(p.takeUpTo(99)).isEqualTo(99L);
assertThat(p.takeUpTo(10)).isEqualTo(1L);
assertThat(p.takeUpTo(1L)).isEqualTo(0L);
}
@Test
public void testChangedParameters() {
RateSpec s1 = new RateSpec(1000L, 1.10D);
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(component,s1, def);
long r = p.refill(10000000);
assertThat(r).isEqualTo(10000000L);
assertThat(p.getWaitTime()).isEqualTo(10000000L);
RateSpec s2 = new RateSpec(1000000L, 1.10D);
p.apply(new NBLabeledElement() {
@Override
public NBLabels getLabels() {
return NBLabels.forKV("name","test");
}
},s2);
}
}

View File

@ -0,0 +1,138 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.sandbox;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRate;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.TimeUnit;
@State(Scope.Group)
@Measurement(time = 10,timeUnit = TimeUnit.SECONDS)
public class SimRateTest {
private final NBComponent parent = new TestComponent("rltest","rltest");
public static void main(String[] args) {
Options jmhOptions = new OptionsBuilder()
// .include("simrate[0-9]+")
// .include("simrate(1|24|240)")
.forks(1)
.warmupBatchSize(1)
.warmupIterations(0)
.build();
try {
new Runner(jmhOptions).run();
} catch (RunnerException e) {
throw new RuntimeException(e);
}
}
private SimRate rl;
@Setup
public void setup() {
SimRateSpec spec = new SimRateSpec(1000000000.0,1.1);
// dang only 250M/s :]
rl = new SimRate(parent,spec);
}
@Benchmark
@Group("simrate1")
@GroupThreads(1)
@BenchmarkMode(Mode.Throughput)
@Disabled
public void tptest1() {
rl.block();
}
@Benchmark
@Group("simrate6")
@GroupThreads(6)
@BenchmarkMode(Mode.Throughput)
@Disabled
public void tptest6() {
rl.block();
}
@Benchmark
@Group("simrate12")
@GroupThreads(12)
@BenchmarkMode(Mode.Throughput)
@Disabled
public void tptest12() {
rl.block();
}
@Benchmark
@Group("simrate24")
@GroupThreads(24)
@BenchmarkMode(Mode.Throughput)
@Disabled
public void tptest24() {
rl.block();
}
@Benchmark
@Group("simrate240")
@GroupThreads(240)
@BenchmarkMode(Mode.Throughput)
@Disabled
public void tptest240() {
rl.block();
}
@Benchmark
@Group("simrate2400")
@GroupThreads(2400)
@BenchmarkMode(Mode.Throughput)
@Disabled
public void tptest2400() {
rl.block();
}
@Test
@Disabled
public void testBasicRate() {
SimRateSpec spec = new SimRateSpec(1000000000.0, 1.1);
// dang only 250M/s :]
SimRate rl = new SimRate(parent,spec);
for (long i = 0; i < 10000000000L; i++) {
long waiting = rl.block();
if ((i%100000000)!=0) continue;
System.out.println("op time:" + i);
}
}
@State(Scope.Group)
public static class ThreadCountState {
@Param({"1","12","24"})
public int threads;
}
}

View File

@ -28,7 +28,7 @@ import java.io.Reader;
import java.util.Map;
public class SceneBuilder implements SceneBuilderFacets.ALL {
private Map<String,String> params;
private Map<String,String> params = Map.of();
private ScenarioActivitiesController controller;
private Extensions extensions;
private PrintWriter out = new PrintWriter(System.out);

View File

@ -51,6 +51,9 @@ public class NBBaseComponentMetrics implements NBComponentMetrics {
@Override
public List<NBMetric> findComponentMetrics(String pattern) {
if (this.metrics.containsKey(pattern)) {
return List.of(metrics.get(pattern));
}
TagFilter filter = new TagFilter(pattern);
return filter.filterLabeled(metrics.values());
}

View File

@ -23,13 +23,13 @@ import static org.assertj.core.api.Assertions.assertThat;
class TestComponentViewTest {
@Test
public void testDiagnosticView() {
NBComponent root = new TestComponent("rootk","rootv");
TestComponent tc = new TestComponent(root, "atest", "view");
String string = tc.toString();
assertThat(string).isEqualTo("TestComponent #1305486145");
string = string.replaceAll("#[0-9]+","#ID");
assertThat(string).isEqualTo("TestComponent #ID");
}
}

View File

@ -16,6 +16,8 @@
package io.nosqlbench.virtdata.library.basics.shared.from_long.to_collection;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import java.util.Set;
@ -24,12 +26,13 @@ import java.util.function.*;
import static org.assertj.core.api.Assertions.assertThat;
public class SetFunctionsTest {
private final static Logger logger = LogManager.getLogger(SetFunctionsTest.class);
@Test
public void testSetFunctions() {
SetFunctions f1 = new SetFunctions((LongUnaryOperator) i -> i, (LongFunction<Double>) j -> (double) j);
Set<Object> set = f1.apply(3);
System.out.println(set);
logger.info(set);
}
@Test