diff --git a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java index e3aa48163..af7f64744 100644 --- a/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java +++ b/driver-pulsar/src/main/java/io/nosqlbench/driver/pulsar/PulsarActivity.java @@ -10,6 +10,8 @@ import io.nosqlbench.driver.pulsar.util.PulsarNBClientConf; import io.nosqlbench.engine.api.activityapi.core.ActivityDefObserver; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.planning.OpSequence; +import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiter; +import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters; import io.nosqlbench.engine.api.activityimpl.ActivityDef; import io.nosqlbench.engine.api.activityimpl.OpDispenser; import io.nosqlbench.engine.api.activityimpl.SimpleActivity; @@ -57,6 +59,7 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve private NBErrorHandler errorHandler; private OpSequence> sequencer; private volatile Throwable asyncOperationFailure; + private boolean cycleratePerThread; public PulsarActivity(ActivityDef activityDef) { super(activityDef); @@ -108,11 +111,26 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve () -> activityDef.getParams().getOptionalString("errors").orElse("stop"), this::getExceptionMetrics ); + + cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false); } + private final ThreadLocal cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> { + if (super.getCycleLimiter() != null) { + return RateLimiters.createOrUpdate(this.getActivityDef(), "cycles", null, + super.getCycleLimiter().getRateSpec()); + } else { + return null; + } + }); + @Override - public synchronized void onActivityDefUpdate(ActivityDef activityDef) { - super.onActivityDefUpdate(activityDef); + public RateLimiter getCycleLimiter() { + if (cycleratePerThread) { + return cycleLimiterThreadLocal.get(); + } else { + return super.getCycleLimiter(); + } } public NBErrorHandler getErrorHandler() { return errorHandler; }