mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
encapsulate update logic into token pool
This commit is contained in:
parent
0af587d28c
commit
106dc3bafc
@ -21,8 +21,9 @@ import com.codahale.metrics.Gauge;
|
|||||||
import io.nosqlbench.engine.api.activityapi.core.Startable;
|
import io.nosqlbench.engine.api.activityapi.core.Startable;
|
||||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||||
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||||
import org.apache.logging.log4j.Logger;
|
import io.nosqlbench.nb.annotations.Service;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
@ -76,6 +77,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
* overall workload is not saturating resources.
|
* overall workload is not saturating resources.
|
||||||
* </p>
|
* </p>
|
||||||
*/
|
*/
|
||||||
|
@Service(value = RateLimiter.class, selector = "hybrid")
|
||||||
public class HybridRateLimiter implements Startable, RateLimiter {
|
public class HybridRateLimiter implements Startable, RateLimiter {
|
||||||
|
|
||||||
private final static Logger logger = LogManager.getLogger(HybridRateLimiter.class);
|
private final static Logger logger = LogManager.getLogger(HybridRateLimiter.class);
|
||||||
@ -94,7 +96,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
|
|||||||
private Gauge<Long> delayGauge;
|
private Gauge<Long> delayGauge;
|
||||||
private Gauge<Double> avgRateGauge;
|
private Gauge<Double> avgRateGauge;
|
||||||
private Gauge<Double> burstRateGauge;
|
private Gauge<Double> burstRateGauge;
|
||||||
private TokenPool tokens;
|
private ThreadDrivenTokenPool tokens;
|
||||||
// diagnostics
|
// diagnostics
|
||||||
|
|
||||||
// TODO Doc rate limiter scenarios, including when you want to reset the waittime, and when you don't
|
// TODO Doc rate limiter scenarios, including when you want to reset the waittime, and when you don't
|
||||||
@ -150,7 +152,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.rateSpec = updatingRateSpec;
|
this.rateSpec = updatingRateSpec;
|
||||||
this.tokens = (this.tokens == null) ? new TokenPool(rateSpec, activityDef) : this.tokens.apply(rateSpec);
|
this.tokens = (this.tokens == null) ? new ThreadDrivenTokenPool(rateSpec, activityDef) : this.tokens.apply(rateSpec);
|
||||||
// this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
|
// this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
|
||||||
// this.tokens = this.filler.getTokenPool();
|
// this.tokens = this.filler.getTokenPool();
|
||||||
|
|
||||||
|
@ -0,0 +1,269 @@
|
|||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2016 jshook
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
* /
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
||||||
|
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||||
|
import io.nosqlbench.nb.annotations.Service;
|
||||||
|
import org.apache.logging.log4j.LogManager;
|
||||||
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
|
import static io.nosqlbench.engine.api.util.Colors.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <h2>Synopsis</h2>
|
||||||
|
*
|
||||||
|
* This TokenPool represents a finite quantity which can be
|
||||||
|
* replenished with regular refills. Extra tokens that do not fit
|
||||||
|
* within the active token pool are saved in a waiting token pool and
|
||||||
|
* used to backfill when allowed according to the backfill rate.
|
||||||
|
*
|
||||||
|
* A detailed explanation for how this works will be included
|
||||||
|
* at @link "http://docs.nosqlbench.io/" under dev notes.
|
||||||
|
*
|
||||||
|
* <p>This is the basis for the token-based rate limiters in
|
||||||
|
* NB. This mechanism is easily adaptable to bursting
|
||||||
|
* capability as well as a degree of stricter timing at speed.
|
||||||
|
* Various methods for doing this in a lock free way were
|
||||||
|
* investigated, but the intrinsic locks provided by synchronized
|
||||||
|
* method won out for now. This may be revisited when EB is
|
||||||
|
* retrofitted for J11.
|
||||||
|
* </p>
|
||||||
|
*/
|
||||||
|
@Service(value = TokenPool.class, selector = "threaded")
|
||||||
|
public class ThreadDrivenTokenPool implements TokenPool {
|
||||||
|
|
||||||
|
private final static Logger logger = LogManager.getLogger(ThreadDrivenTokenPool.class);
|
||||||
|
|
||||||
|
public static final double MIN_CONCURRENT_OPS = 2;
|
||||||
|
|
||||||
|
private long maxActivePool;
|
||||||
|
private long burstPoolSize;
|
||||||
|
private long maxOverActivePool;
|
||||||
|
private double burstRatio;
|
||||||
|
// TODO Consider removing volatile after investigating
|
||||||
|
private volatile long activePool;
|
||||||
|
private volatile long waitingPool;
|
||||||
|
private RateSpec rateSpec;
|
||||||
|
private long nanosPerOp;
|
||||||
|
// private long debugTrigger=0L;
|
||||||
|
// private long debugRate=1000000000;
|
||||||
|
private long blocks = 0L;
|
||||||
|
|
||||||
|
private TokenFiller filler;
|
||||||
|
private final ActivityDef activityDef;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This constructor tries to pick reasonable defaults for the token pool for
|
||||||
|
* a given rate spec. The active pool must be large enough to contain one
|
||||||
|
* op worth of time, and the burst ratio
|
||||||
|
*
|
||||||
|
* @param rateSpec a {@link RateSpec}
|
||||||
|
*/
|
||||||
|
public ThreadDrivenTokenPool(RateSpec rateSpec, ActivityDef activityDef) {
|
||||||
|
this.activityDef = activityDef;
|
||||||
|
apply(rateSpec);
|
||||||
|
logger.debug("initialized token pool: " + this.toString() + " for rate:" + rateSpec.toString());
|
||||||
|
// filler.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Change the settings of this token pool, and wake any blocked callers
|
||||||
|
* just in case it allows them to proceed.
|
||||||
|
*
|
||||||
|
* @param rateSpec The rate specifier.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized TokenPool apply(RateSpec rateSpec) {
|
||||||
|
this.rateSpec = rateSpec;
|
||||||
|
this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
|
||||||
|
this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio());
|
||||||
|
this.burstRatio = rateSpec.getBurstRatio();
|
||||||
|
|
||||||
|
this.burstPoolSize = maxOverActivePool - maxActivePool;
|
||||||
|
this.nanosPerOp = rateSpec.getNanosPerOp();
|
||||||
|
this.filler = (this.filler == null) ? new TokenFiller(rateSpec, this, activityDef) : filler.apply(rateSpec);
|
||||||
|
notifyAll();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public double getBurstRatio() {
|
||||||
|
return burstRatio;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Take tokens up to amt tokens form the pool and report
|
||||||
|
* the amount of token removed.
|
||||||
|
*
|
||||||
|
* @param amt tokens requested
|
||||||
|
* @return actual number of tokens removed, greater to or equal to zero
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized long takeUpTo(long amt) {
|
||||||
|
long take = Math.min(amt, activePool);
|
||||||
|
activePool -= take;
|
||||||
|
return take;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* wait for the given number of tokens to be available, and then remove
|
||||||
|
* them from the pool.
|
||||||
|
*
|
||||||
|
* @return the total number of tokens untaken, including wait tokens
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public synchronized long blockAndTake() {
|
||||||
|
while (activePool < nanosPerOp) {
|
||||||
|
blocks++;
|
||||||
|
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
|
||||||
|
try {
|
||||||
|
wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
|
||||||
|
}
|
||||||
|
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
|
||||||
|
|
||||||
|
activePool -= nanosPerOp;
|
||||||
|
return waitingPool + activePool;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized long blockAndTake(long tokens) {
|
||||||
|
while (activePool < tokens) {
|
||||||
|
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
|
||||||
|
try {
|
||||||
|
wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
|
||||||
|
}
|
||||||
|
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
|
||||||
|
|
||||||
|
activePool -= tokens;
|
||||||
|
return waitingPool + activePool;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getWaitTime() {
|
||||||
|
return activePool + waitingPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getWaitPool() {
|
||||||
|
return waitingPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getActivePool() {
|
||||||
|
return activePool;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add the given number of new tokens to the pool, forcing any amount
|
||||||
|
* that would spill over the current pool size into the wait token pool, but
|
||||||
|
* moving up to the configured burst tokens back from the wait token pool
|
||||||
|
* otherwise.
|
||||||
|
*
|
||||||
|
* The amount of backfilling that occurs is controlled by the backfill ratio,
|
||||||
|
* based on the number of tokens submitted. This causes normalizes the
|
||||||
|
* backfilling rate to the fill rate, so that it is not sensitive to refill
|
||||||
|
* scheduling.
|
||||||
|
*
|
||||||
|
* @param newTokens The number of new tokens to add to the token pools
|
||||||
|
* @return the total number of tokens in all pools
|
||||||
|
*/
|
||||||
|
public synchronized long refill(long newTokens) {
|
||||||
|
boolean debugthis = false;
|
||||||
|
// long debugAt = System.nanoTime();
|
||||||
|
// if (debugAt>debugTrigger+debugRate) {
|
||||||
|
// debugTrigger=debugAt;
|
||||||
|
// debugthis=true;
|
||||||
|
// }
|
||||||
|
|
||||||
|
long needed = Math.max(maxActivePool - activePool, 0L);
|
||||||
|
long allocatedToActivePool = Math.min(newTokens, needed);
|
||||||
|
activePool += allocatedToActivePool;
|
||||||
|
|
||||||
|
|
||||||
|
// overflow logic
|
||||||
|
long allocatedToOverflowPool = newTokens - allocatedToActivePool;
|
||||||
|
waitingPool += allocatedToOverflowPool;
|
||||||
|
|
||||||
|
// backfill logic
|
||||||
|
double refillFactor = Math.min((double) newTokens / maxActivePool, 1.0D);
|
||||||
|
long burstFillAllowed = (long) (refillFactor * burstPoolSize);
|
||||||
|
|
||||||
|
burstFillAllowed = Math.min(maxOverActivePool - activePool, burstFillAllowed);
|
||||||
|
long burstFill = Math.min(burstFillAllowed, waitingPool);
|
||||||
|
|
||||||
|
waitingPool -= burstFill;
|
||||||
|
activePool += burstFill;
|
||||||
|
|
||||||
|
if (debugthis) {
|
||||||
|
System.out.print(this);
|
||||||
|
System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
|
||||||
|
if (allocatedToOverflowPool > 0) {
|
||||||
|
System.out.print(ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + ANSI_Reset);
|
||||||
|
}
|
||||||
|
if (burstFill > 0) {
|
||||||
|
System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
|
||||||
|
}
|
||||||
|
System.out.println();
|
||||||
|
}
|
||||||
|
//System.out.println(this);
|
||||||
|
notifyAll();
|
||||||
|
|
||||||
|
return activePool + waitingPool;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "Tokens: active=" + activePool + "/" + maxActivePool
|
||||||
|
+ String.format(
|
||||||
|
" (%3.1f%%)A (%3.1f%%)B ",
|
||||||
|
(((double) activePool / (double) maxActivePool) * 100.0),
|
||||||
|
(((double) activePool / (double) maxOverActivePool) * 100.0)) + " waiting=" + waitingPool +
|
||||||
|
" blocks=" + blocks +
|
||||||
|
" rateSpec:" + ((rateSpec != null) ? rateSpec.toString() : "NULL");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RateSpec getRateSpec() {
|
||||||
|
return rateSpec;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public synchronized long restart() {
|
||||||
|
long wait = activePool + waitingPool;
|
||||||
|
activePool = 0L;
|
||||||
|
waitingPool = 0L;
|
||||||
|
filler.restart();
|
||||||
|
return wait;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void start() {
|
||||||
|
filler.start();
|
||||||
|
}
|
||||||
|
}
|
@ -35,7 +35,7 @@ public class TokenFiller implements Runnable {
|
|||||||
// (false);
|
// (false);
|
||||||
private final long interval = (long) 1E6;
|
private final long interval = (long) 1E6;
|
||||||
|
|
||||||
private final TokenPool tokenPool;
|
private final ThreadDrivenTokenPool tokenPool;
|
||||||
private volatile boolean running = true;
|
private volatile boolean running = true;
|
||||||
private RateSpec rateSpec;
|
private RateSpec rateSpec;
|
||||||
private Thread thread;
|
private Thread thread;
|
||||||
@ -43,14 +43,14 @@ public class TokenFiller implements Runnable {
|
|||||||
private final Timer timer;
|
private final Timer timer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A token filler adds tokens to a {@link TokenPool} at some rate.
|
* A token filler adds tokens to a {@link ThreadDrivenTokenPool} at some rate.
|
||||||
* By default, this rate is at least every millisecond +- scheduling jitter
|
* By default, this rate is at least every millisecond +- scheduling jitter
|
||||||
* in the JVM.
|
* in the JVM.
|
||||||
*
|
*
|
||||||
* @param rateSpec A {@link RateSpec}
|
* @param rateSpec A {@link RateSpec}
|
||||||
* @param def An {@link ActivityDef}
|
* @param def An {@link ActivityDef}
|
||||||
*/
|
*/
|
||||||
public TokenFiller(RateSpec rateSpec, TokenPool tokenPool, ActivityDef def) {
|
public TokenFiller(RateSpec rateSpec, ThreadDrivenTokenPool tokenPool, ActivityDef def) {
|
||||||
this.rateSpec = rateSpec;
|
this.rateSpec = rateSpec;
|
||||||
this.tokenPool = tokenPool;
|
this.tokenPool = tokenPool;
|
||||||
this.timer = ActivityMetrics.timer(def, "tokenfiller");
|
this.timer = ActivityMetrics.timer(def, "tokenfiller");
|
||||||
|
@ -1,256 +1,25 @@
|
|||||||
/*
|
|
||||||
*
|
|
||||||
* Copyright 2016 jshook
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
* /
|
|
||||||
*/
|
|
||||||
|
|
||||||
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
||||||
|
|
||||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
public interface TokenPool {
|
||||||
import org.apache.logging.log4j.LogManager;
|
TokenPool apply(RateSpec rateSpec);
|
||||||
import org.apache.logging.log4j.Logger;
|
|
||||||
|
|
||||||
import static io.nosqlbench.engine.api.util.Colors.*;
|
double getBurstRatio();
|
||||||
|
|
||||||
/**
|
long takeUpTo(long amt);
|
||||||
* <h2>Synopsis</h2>
|
|
||||||
*
|
|
||||||
* This TokenPool represents a finite quantity which can be
|
|
||||||
* replenished with regular refills. Extra tokens that do not fit
|
|
||||||
* within the active token pool are saved in a waiting token pool and
|
|
||||||
* used to backfill when allowed according to the backfill rate.
|
|
||||||
*
|
|
||||||
* A detailed explanation for how this works will be included
|
|
||||||
* at @link "http://docs.nosqlbench.io/" under dev notes.
|
|
||||||
*
|
|
||||||
* <p>This is the basis for the token-based rate limiters in
|
|
||||||
* NB. This mechanism is easily adaptable to bursting
|
|
||||||
* capability as well as a degree of stricter timing at speed.
|
|
||||||
* Various methods for doing this in a lock free way were
|
|
||||||
* investigated, but the intrinsic locks provided by synchronized
|
|
||||||
* method won out for now. This may be revisited when EB is
|
|
||||||
* retrofitted for J11.
|
|
||||||
* </p>
|
|
||||||
*/
|
|
||||||
public class TokenPool {
|
|
||||||
|
|
||||||
private final static Logger logger = LogManager.getLogger(TokenPool.class);
|
long blockAndTake();
|
||||||
|
|
||||||
public static final double MIN_CONCURRENT_OPS = 2;
|
long blockAndTake(long tokens);
|
||||||
|
|
||||||
private long maxActivePool;
|
long getWaitTime();
|
||||||
private long burstPoolSize;
|
|
||||||
private long maxOverActivePool;
|
|
||||||
private double burstRatio;
|
|
||||||
// TODO Consider removing volatile after investigating
|
|
||||||
private volatile long activePool;
|
|
||||||
private volatile long waitingPool;
|
|
||||||
private RateSpec rateSpec;
|
|
||||||
private long nanosPerOp;
|
|
||||||
// private long debugTrigger=0L;
|
|
||||||
// private long debugRate=1000000000;
|
|
||||||
private long blocks = 0L;
|
|
||||||
|
|
||||||
private TokenFiller filler;
|
long getWaitPool();
|
||||||
private final ActivityDef activityDef;
|
|
||||||
|
|
||||||
/**
|
long getActivePool();
|
||||||
* This constructor tries to pick reasonable defaults for the token pool for
|
|
||||||
* a given rate spec. The active pool must be large enough to contain one
|
|
||||||
* op worth of time, and the burst ratio
|
|
||||||
*
|
|
||||||
* @param rateSpec a {@link RateSpec}
|
|
||||||
*/
|
|
||||||
public TokenPool(RateSpec rateSpec, ActivityDef activityDef) {
|
|
||||||
this.activityDef = activityDef;
|
|
||||||
apply(rateSpec);
|
|
||||||
logger.debug("initialized token pool: " + this.toString() + " for rate:" + rateSpec.toString());
|
|
||||||
// filler.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
RateSpec getRateSpec();
|
||||||
* Change the settings of this token pool, and wake any blocked callers
|
|
||||||
* just in case it allows them to proceed.
|
|
||||||
*
|
|
||||||
* @param rateSpec The rate specifier.
|
|
||||||
*/
|
|
||||||
public synchronized TokenPool apply(RateSpec rateSpec) {
|
|
||||||
this.rateSpec = rateSpec;
|
|
||||||
this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
|
|
||||||
this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio());
|
|
||||||
this.burstRatio = rateSpec.getBurstRatio();
|
|
||||||
|
|
||||||
this.burstPoolSize = maxOverActivePool - maxActivePool;
|
long restart();
|
||||||
this.nanosPerOp = rateSpec.getNanosPerOp();
|
|
||||||
this.filler = (this.filler == null) ? new TokenFiller(rateSpec, this, activityDef) : filler.apply(rateSpec);
|
|
||||||
notifyAll();
|
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
void start();
|
||||||
public double getBurstRatio() {
|
|
||||||
return burstRatio;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Take tokens up to amt tokens form the pool and report
|
|
||||||
* the amount of token removed.
|
|
||||||
*
|
|
||||||
* @param amt tokens requested
|
|
||||||
* @return actual number of tokens removed, greater to or equal to zero
|
|
||||||
*/
|
|
||||||
public synchronized long takeUpTo(long amt) {
|
|
||||||
long take = Math.min(amt, activePool);
|
|
||||||
activePool -= take;
|
|
||||||
return take;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* wait for the given number of tokens to be available, and then remove
|
|
||||||
* them from the pool.
|
|
||||||
*
|
|
||||||
* @return the total number of tokens untaken, including wait tokens
|
|
||||||
*/
|
|
||||||
public synchronized long blockAndTake() {
|
|
||||||
while (activePool < nanosPerOp) {
|
|
||||||
blocks++;
|
|
||||||
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
|
|
||||||
try {
|
|
||||||
wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
|
|
||||||
} catch (InterruptedException ignored) {
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
|
|
||||||
}
|
|
||||||
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
|
|
||||||
|
|
||||||
activePool -= nanosPerOp;
|
|
||||||
return waitingPool + activePool;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized long blockAndTake(long tokens) {
|
|
||||||
while (activePool < tokens) {
|
|
||||||
//System.out.println(ANSI_BrightRed + "waiting for " + amt + "/" + activePool + " of max " + maxActivePool + ANSI_Reset);
|
|
||||||
try {
|
|
||||||
wait(maxActivePool / 1000000, (int) maxActivePool % 1000000);
|
|
||||||
} catch (InterruptedException ignored) {
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
|
||||||
//System.out.println("waited for " + amt + "/" + activePool + " tokens");
|
|
||||||
}
|
|
||||||
//System.out.println(ANSI_BrightYellow + "taking " + amt + "/" + activePool + ANSI_Reset);
|
|
||||||
|
|
||||||
activePool -= tokens;
|
|
||||||
return waitingPool + activePool;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getWaitTime() {
|
|
||||||
return activePool + waitingPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getWaitPool() {
|
|
||||||
return waitingPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getActivePool() {
|
|
||||||
return activePool;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Add the given number of new tokens to the pool, forcing any amount
|
|
||||||
* that would spill over the current pool size into the wait token pool, but
|
|
||||||
* moving up to the configured burst tokens back from the wait token pool
|
|
||||||
* otherwise.
|
|
||||||
*
|
|
||||||
* The amount of backfilling that occurs is controlled by the backfill ratio,
|
|
||||||
* based on the number of tokens submitted. This causes normalizes the
|
|
||||||
* backfilling rate to the fill rate, so that it is not sensitive to refill
|
|
||||||
* scheduling.
|
|
||||||
*
|
|
||||||
* @param newTokens The number of new tokens to add to the token pools
|
|
||||||
* @return the total number of tokens in all pools
|
|
||||||
*/
|
|
||||||
public synchronized long refill(long newTokens) {
|
|
||||||
boolean debugthis=false;
|
|
||||||
// long debugAt = System.nanoTime();
|
|
||||||
// if (debugAt>debugTrigger+debugRate) {
|
|
||||||
// debugTrigger=debugAt;
|
|
||||||
// debugthis=true;
|
|
||||||
// }
|
|
||||||
|
|
||||||
long needed = Math.max(maxActivePool - activePool, 0L);
|
|
||||||
long allocatedToActivePool = Math.min(newTokens, needed);
|
|
||||||
activePool += allocatedToActivePool;
|
|
||||||
|
|
||||||
|
|
||||||
// overflow logic
|
|
||||||
long allocatedToOverflowPool = newTokens - allocatedToActivePool;
|
|
||||||
waitingPool += allocatedToOverflowPool;
|
|
||||||
|
|
||||||
// backfill logic
|
|
||||||
double refillFactor = Math.min((double) newTokens / maxActivePool, 1.0D);
|
|
||||||
long burstFillAllowed =(long) (refillFactor* burstPoolSize);
|
|
||||||
|
|
||||||
burstFillAllowed = Math.min(maxOverActivePool - activePool, burstFillAllowed);
|
|
||||||
long burstFill = Math.min(burstFillAllowed, waitingPool);
|
|
||||||
|
|
||||||
waitingPool -= burstFill;
|
|
||||||
activePool += burstFill;
|
|
||||||
|
|
||||||
if (debugthis) {
|
|
||||||
System.out.print(this);
|
|
||||||
System.out.print(ANSI_BrightBlue + " adding=" + allocatedToActivePool);
|
|
||||||
if (allocatedToOverflowPool>0) {
|
|
||||||
System.out.print(ANSI_Red + " OVERFLOW:" + allocatedToOverflowPool + ANSI_Reset);
|
|
||||||
}
|
|
||||||
if (burstFill>0) {
|
|
||||||
System.out.print(ANSI_BrightGreen + " BACKFILL:" + burstFill + ANSI_Reset);
|
|
||||||
}
|
|
||||||
System.out.println();
|
|
||||||
}
|
|
||||||
//System.out.println(this);
|
|
||||||
notifyAll();
|
|
||||||
|
|
||||||
return activePool+waitingPool;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public String toString() {
|
|
||||||
return "Tokens: active=" + activePool +"/" + maxActivePool
|
|
||||||
+ String.format(
|
|
||||||
" (%3.1f%%)A (%3.1f%%)B ",
|
|
||||||
(((double)activePool/(double)maxActivePool)*100.0),
|
|
||||||
(((double)activePool/(double)maxOverActivePool)*100.0)) + " waiting=" + waitingPool +
|
|
||||||
" blocks=" + blocks +
|
|
||||||
" rateSpec:"+ ((rateSpec!=null) ? rateSpec.toString() : "NULL");
|
|
||||||
}
|
|
||||||
|
|
||||||
public RateSpec getRateSpec() {
|
|
||||||
return rateSpec;
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized long restart() {
|
|
||||||
long wait = activePool + waitingPool;
|
|
||||||
activePool = 0L;
|
|
||||||
waitingPool = 0L;
|
|
||||||
filler.restart();
|
|
||||||
return wait;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void start() {
|
|
||||||
filler.start();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,7 @@ public class TokenPoolTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBackfillFullRate() {
|
public void testBackfillFullRate() {
|
||||||
TokenPool p = new TokenPool(new RateSpec(10000000, 1.1), def);
|
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(new RateSpec(10000000, 1.1), def);
|
||||||
assertThat(p.refill(1000000L)).isEqualTo(1000000L);
|
assertThat(p.refill(1000000L)).isEqualTo(1000000L);
|
||||||
assertThat(p.getWaitPool()).isEqualTo(0L);
|
assertThat(p.getWaitPool()).isEqualTo(0L);
|
||||||
assertThat(p.refill(100L)).isEqualTo(1000100);
|
assertThat(p.refill(100L)).isEqualTo(1000100);
|
||||||
@ -43,7 +43,7 @@ public class TokenPoolTest {
|
|||||||
}
|
}
|
||||||
@Test
|
@Test
|
||||||
public void testTakeRanges() {
|
public void testTakeRanges() {
|
||||||
TokenPool p = new TokenPool(new RateSpec(100, 10), def);
|
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(new RateSpec(100, 10), def);
|
||||||
p.refill(100);
|
p.refill(100);
|
||||||
assertThat(p.takeUpTo(99)).isEqualTo(99L);
|
assertThat(p.takeUpTo(99)).isEqualTo(99L);
|
||||||
assertThat(p.takeUpTo(10)).isEqualTo(1L);
|
assertThat(p.takeUpTo(10)).isEqualTo(1L);
|
||||||
@ -54,7 +54,7 @@ public class TokenPoolTest {
|
|||||||
public void testChangedParameters() {
|
public void testChangedParameters() {
|
||||||
|
|
||||||
RateSpec s1 = new RateSpec(1000L, 1.10D);
|
RateSpec s1 = new RateSpec(1000L, 1.10D);
|
||||||
TokenPool p = new TokenPool(s1, def);
|
ThreadDrivenTokenPool p = new ThreadDrivenTokenPool(s1, def);
|
||||||
long r = p.refill(10000000);
|
long r = p.refill(10000000);
|
||||||
assertThat(r).isEqualTo(10000000L);
|
assertThat(r).isEqualTo(10000000L);
|
||||||
assertThat(p.getWaitTime()).isEqualTo(10000000L);
|
assertThat(p.getWaitTime()).isEqualTo(10000000L);
|
||||||
|
Loading…
Reference in New Issue
Block a user