Merge pull request #369 from lhotari/lh-rate-limit-per-thread

Make ratelimiter instance thread specific for PulsarActivity
This commit is contained in:
Jonathan Shook 2021-10-19 14:33:33 -05:00 committed by GitHub
commit b82b4b1a43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -10,6 +10,8 @@ import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf;
import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter;
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters;
import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.ActivityDef;
import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
@ -57,6 +59,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
private NBErrorHandler errorHandler; private NBErrorHandler errorHandler;
private OpSequence<OpDispenser<PulsarOp>> sequencer; private OpSequence<OpDispenser<PulsarOp>> sequencer;
private volatile Throwable asyncOperationFailure; private volatile Throwable asyncOperationFailure;
private boolean cycleratePerThread;
public PulsarActivity(ActivityDef activityDef) { public PulsarActivity(ActivityDef activityDef) {
super(activityDef); super(activityDef);
@ -108,11 +111,26 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
() -> activityDef.getParams().getOptionalString("errors").orElse("stop"), () -> activityDef.getParams().getOptionalString("errors").orElse("stop"),
this::getExceptionMetrics this::getExceptionMetrics
); );
cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false);
} }
private final ThreadLocal<RateLimiter> cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> {
if (super.getCycleLimiter() != null) {
return RateLimiters.createOrUpdate(this.getActivityDef(), "cycles", null,
super.getCycleLimiter().getRateSpec());
} else {
return null;
}
});
@Override @Override
public synchronized void onActivityDefUpdate(ActivityDef activityDef) { public RateLimiter getCycleLimiter() {
super.onActivityDefUpdate(activityDef); if (cycleratePerThread) {
return cycleLimiterThreadLocal.get();
} else {
return super.getCycleLimiter();
}
} }
public NBErrorHandler getErrorHandler() { return errorHandler; } public NBErrorHandler getErrorHandler() { return errorHandler; }