encapsulate update logic into token pool

This commit is contained in:
Jonathan Shook
2020-12-02 14:40:20 -06:00
parent 1e409332a8
commit 0af587d28c
5 changed files with 48 additions and 40 deletions

View File

@@ -80,7 +80,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
private final static Logger logger = LogManager.getLogger(HybridRateLimiter.class); private final static Logger logger = LogManager.getLogger(HybridRateLimiter.class);
private volatile TokenFiller filler; //private volatile TokenFiller filler;
private volatile long starttime; private volatile long starttime;
// rate controls // rate controls
@@ -150,8 +150,9 @@ public class HybridRateLimiter implements Startable, RateLimiter {
} }
this.rateSpec = updatingRateSpec; this.rateSpec = updatingRateSpec;
this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec); this.tokens = (this.tokens == null) ? new TokenPool(rateSpec, activityDef) : this.tokens.apply(rateSpec);
this.tokens = this.filler.getTokenPool(); // this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
// this.tokens = this.filler.getTokenPool();
if (this.state == State.Idle && updatingRateSpec.isAutoStart()) { if (this.state == State.Idle && updatingRateSpec.isAutoStart()) {
this.start(); this.start();
@@ -177,7 +178,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
case Idle: case Idle:
long nanos = getNanoClockTime(); long nanos = getNanoClockTime();
this.starttime = nanos; this.starttime = nanos;
this.filler.start(); this.tokens.start();
state = State.Started; state = State.Started;
break; break;
} }
@@ -191,7 +192,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
case Started: case Started:
long accumulatedWaitSinceLastStart = cumulativeWaitTimeNanos.get(); long accumulatedWaitSinceLastStart = cumulativeWaitTimeNanos.get();
cumulativeWaitTimeNanos.set(0L); cumulativeWaitTimeNanos.set(0L);
return this.filler.restart() + accumulatedWaitSinceLastStart; return this.tokens.restart() + accumulatedWaitSinceLastStart;
default: default:
return 0L; return 0L;
} }
@@ -215,14 +216,14 @@ public class HybridRateLimiter implements Startable, RateLimiter {
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(HybridRateLimiter.class.getSimpleName()); StringBuilder sb = new StringBuilder(HybridRateLimiter.class.getSimpleName());
if (this.getRateSpec()!=null) { if (this.getRateSpec() != null) {
sb.append(" spec=").append(this.getRateSpec().toString()); sb.append(" spec=").append(this.getRateSpec().toString());
} }
if (this.state!=null) { if (this.state != null) {
sb.append(" state=").append(this.state); sb.append(" state=").append(this.state);
} }
if (this.filler !=null) { if (this.tokens != null) {
sb.append(" filler=").append(this.filler.toString()); sb.append(" tokens=").append(this.tokens.toString());
} }
return sb.toString(); return sb.toString();
} }
@@ -245,7 +246,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
@Override @Override
public Long getValue() { public Long getValue() {
TokenPool pool = rl.filler.getTokenPool(); TokenPool pool = rl.tokens;
if (pool==null) { if (pool==null) {
return 0L; return 0L;
} }

View File

@@ -48,18 +48,16 @@ public class TokenFiller implements Runnable {
* in the JVM. * in the JVM.
* *
* @param rateSpec A {@link RateSpec} * @param rateSpec A {@link RateSpec}
* @param def An {@link ActivityDef} * @param def An {@link ActivityDef}
*/ */
public TokenFiller(RateSpec rateSpec, ActivityDef def) { public TokenFiller(RateSpec rateSpec, TokenPool tokenPool, ActivityDef def) {
this.rateSpec = rateSpec; this.rateSpec = rateSpec;
this.tokenPool= new TokenPool(rateSpec); this.tokenPool = tokenPool;
this.tokenPool.refill(rateSpec.getNanosPerOp());
this.timer = ActivityMetrics.timer(def, "tokenfiller"); this.timer = ActivityMetrics.timer(def, "tokenfiller");
} }
public TokenFiller apply(RateSpec rateSpec) { public TokenFiller apply(RateSpec rateSpec) {
this.rateSpec = rateSpec; this.rateSpec = rateSpec;
this.tokenPool.apply(rateSpec);
return this; return this;
} }
@@ -99,6 +97,8 @@ public class TokenFiller implements Runnable {
} }
public TokenFiller start() { public TokenFiller start() {
this.tokenPool.refill(rateSpec.getNanosPerOp());
thread = new Thread(this); thread = new Thread(this);
thread.setName(this.toString()); thread.setName(this.toString());
thread.setPriority(Thread.MAX_PRIORITY); thread.setPriority(Thread.MAX_PRIORITY);

View File

@@ -17,8 +17,9 @@
package io.nosqlbench.engine.api.activityapi.ratelimits; package io.nosqlbench.engine.api.activityapi.ratelimits;
import org.apache.logging.log4j.Logger; import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import static io.nosqlbench.engine.api.util.Colors.*; import static io.nosqlbench.engine.api.util.Colors.*;
@@ -57,10 +58,12 @@ public class TokenPool {
private volatile long waitingPool; private volatile long waitingPool;
private RateSpec rateSpec; private RateSpec rateSpec;
private long nanosPerOp; private long nanosPerOp;
// private long debugTrigger=0L; // private long debugTrigger=0L;
// private long debugRate=1000000000; // private long debugRate=1000000000;
private long blocks = 0L; private long blocks = 0L;
private TokenFiller filler;
private final ActivityDef activityDef;
/** /**
* This constructor tries to pick reasonable defaults for the token pool for * This constructor tries to pick reasonable defaults for the token pool for
@@ -69,16 +72,11 @@ public class TokenPool {
* *
* @param rateSpec a {@link RateSpec} * @param rateSpec a {@link RateSpec}
*/ */
public TokenPool(RateSpec rateSpec) { public TokenPool(RateSpec rateSpec, ActivityDef activityDef) {
this.activityDef = activityDef;
apply(rateSpec); apply(rateSpec);
logger.debug("initialized token pool: " + this.toString() + " for rate:" + rateSpec.toString()); logger.debug("initialized token pool: " + this.toString() + " for rate:" + rateSpec.toString());
} // filler.start();
public TokenPool(long poolsize, double burstRatio) {
this.maxActivePool = poolsize;
this.burstRatio = burstRatio;
this.maxOverActivePool = (long) (maxActivePool * burstRatio);
this.burstPoolSize = maxOverActivePool - maxActivePool;
} }
/** /**
@@ -87,15 +85,17 @@ public class TokenPool {
* *
* @param rateSpec The rate specifier. * @param rateSpec The rate specifier.
*/ */
public synchronized void apply(RateSpec rateSpec) { public synchronized TokenPool apply(RateSpec rateSpec) {
this.rateSpec=rateSpec; this.rateSpec = rateSpec;
this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS)); this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio()); this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio());
this.burstRatio = rateSpec.getBurstRatio(); this.burstRatio = rateSpec.getBurstRatio();
this.burstPoolSize = maxOverActivePool - maxActivePool; this.burstPoolSize = maxOverActivePool - maxActivePool;
this.nanosPerOp = rateSpec.getNanosPerOp(); this.nanosPerOp = rateSpec.getNanosPerOp();
this.filler = (this.filler == null) ? new TokenFiller(rateSpec, this, activityDef) : filler.apply(rateSpec);
notifyAll(); notifyAll();
return this;
} }
@@ -243,10 +243,14 @@ public class TokenPool {
} }
public synchronized long restart() { public synchronized long restart() {
long wait=activePool+waitingPool; long wait = activePool + waitingPool;
activePool=0L; activePool = 0L;
waitingPool=0L; waitingPool = 0L;
filler.restart();
return wait; return wait;
}
public void start() {
filler.start();
} }
} }

View File

@@ -17,30 +17,33 @@
package io.nosqlbench.engine.api.activityapi.ratelimits; package io.nosqlbench.engine.api.activityapi.ratelimits;
import org.junit.Ignore; import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
import org.junit.Test; import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
public class TokenPoolTest { public class TokenPoolTest {
ActivityDef def = new ActivityDef(ParameterMap.parseOrException("alias=testing"));
@Test @Test
public void testBackfillFullRate() { public void testBackfillFullRate() {
TokenPool p = new TokenPool(100, 1.1); TokenPool p = new TokenPool(new RateSpec(10000000, 1.1), def);
assertThat(p.refill(100L)).isEqualTo(100L); assertThat(p.refill(1000000L)).isEqualTo(1000000L);
assertThat(p.getWaitPool()).isEqualTo(0L); assertThat(p.getWaitPool()).isEqualTo(0L);
assertThat(p.refill(100L)).isEqualTo(200); assertThat(p.refill(100L)).isEqualTo(1000100);
assertThat(p.getWaitPool()).isEqualTo(90L); assertThat(p.getWaitPool()).isEqualTo(90L);
assertThat(p.refill(10L)).isEqualTo(210L); assertThat(p.refill(10L)).isEqualTo(1000110L);
assertThat(p.getWaitPool()).isEqualTo(100L); assertThat(p.getWaitPool()).isEqualTo(99L);
assertThat(p.refill(10)).isEqualTo(220L); assertThat(p.refill(10)).isEqualTo(1000120L);
assertThat(p.takeUpTo(100)).isEqualTo(100L); assertThat(p.takeUpTo(100)).isEqualTo(100L);
} }
@Test @Test
public void testTakeRanges() { public void testTakeRanges() {
TokenPool p = new TokenPool(100, 10); TokenPool p = new TokenPool(new RateSpec(100, 10), def);
p.refill(100); p.refill(100);
assertThat(p.takeUpTo(99)).isEqualTo(99L); assertThat(p.takeUpTo(99)).isEqualTo(99L);
assertThat(p.takeUpTo(10)).isEqualTo(1L); assertThat(p.takeUpTo(10)).isEqualTo(1L);
@@ -51,7 +54,7 @@ public class TokenPoolTest {
public void testChangedParameters() { public void testChangedParameters() {
RateSpec s1 = new RateSpec(1000L, 1.10D); RateSpec s1 = new RateSpec(1000L, 1.10D);
TokenPool p = new TokenPool(s1); TokenPool p = new TokenPool(s1, def);
long r = p.refill(10000000); long r = p.refill(10000000);
assertThat(r).isEqualTo(10000000L); assertThat(r).isEqualTo(10000000L);
assertThat(p.getWaitTime()).isEqualTo(10000000L); assertThat(p.getWaitTime()).isEqualTo(10000000L);

View File

@@ -37,7 +37,7 @@
<jaxb.impl.version>2.4.0-b180830.0438</jaxb.impl.version> <jaxb.impl.version>2.4.0-b180830.0438</jaxb.impl.version>
<jersey.media.version>2.29</jersey.media.version> <jersey.media.version>2.29</jersey.media.version>
<jersey.version>2.27</jersey.version> <jersey.version>2.27</jersey.version>
<jetty.version>9.4.25.v20191220</jetty.version> <jetty.version>9.4.35.v20201120</jetty.version>
<jmh.version>1.22</jmh.version> <jmh.version>1.22</jmh.version>
<joda.time.version>2.9.9</joda.time.version> <joda.time.version>2.9.9</joda.time.version>
<junit.jupiter.version>5.3.2</junit.jupiter.version> <junit.jupiter.version>5.3.2</junit.jupiter.version>