remove phase and phase limiter

This commit is contained in:
Jonathan Shook 2023-05-18 14:56:56 -05:00
parent 4e6ad6db61
commit 11ad67d75f
4 changed files with 7 additions and 59 deletions

View File

@ -164,7 +164,6 @@ public abstract class BaseDriverAdapter<R extends Op, S> implements DriverAdapte
.add(Param.optional("cycles").setRegex("\\d+[KMBGTPE]?|\\d+[KMBGTPE]?\\.\\.\\d+[KMBGTPE]?").setDescription("cycle interval to use")) .add(Param.optional("cycles").setRegex("\\d+[KMBGTPE]?|\\d+[KMBGTPE]?\\.\\.\\d+[KMBGTPE]?").setDescription("cycle interval to use"))
.add(Param.optional("recycles").setDescription("allow cycles to be re-used this many times")) .add(Param.optional("recycles").setDescription("allow cycles to be re-used this many times"))
.add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second")) .add(Param.optional(List.of("cyclerate", "targetrate", "rate"), String.class, "rate limit for cycles per second"))
.add(Param.optional("phaserate", String.class, "rate limit for phases per second"))
.add(Param.optional("seq", String.class, "sequencing algorithm")) .add(Param.optional("seq", String.class, "sequencing algorithm"))
.add(Param.optional("instrument", Boolean.class)) .add(Param.optional("instrument", Boolean.class))
.add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file")) .add(Param.optional(List.of("workload", "yaml"), String.class, "location of workload yaml file"))

View File

@ -158,33 +158,8 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
*/ */
RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> supplier); RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> supplier);
/**
* Get the current phase rate limiter for this activity.
* The phase rate limiter is used to throttle the rate at which
* new phases are dispatched across all threads in an activity.
* @return The stride {@link RateLimiter}
*/
RateLimiter getPhaseLimiter();
Timer getResultTimer(); Timer getResultTimer();
/**
* Set the phase rate limiter for this activity. This method should only
* be used in a non-concurrent context. Otherwise, the supplier version
* {@link #getPhaseRateLimiter(Supplier)}} should be used.
* @param rateLimiter The phase {@link RateLimiter} for this activity.
*/
void setPhaseLimiter(RateLimiter rateLimiter);
/**
* Get or create the phase {@link RateLimiter} in a concurrent-safe
* way. Implementations should ensure that this method is synchronized or
* that each requester gets the same phase rate limiter for the activity.
* @param supplier A {@link RateLimiter} {@link Supplier}
* @return An extant or newly created phase {@link RateLimiter}
*/
RateLimiter getPhaseRateLimiter(Supplier<? extends RateLimiter> supplier);
/** /**
* Get or create the instrumentation needed for this activity. This provides * Get or create the instrumentation needed for this activity. This provides
* a single place to find and manage, and document instrumentation that is * a single place to find and manage, and document instrumentation that is

View File

@ -19,6 +19,7 @@ package io.nosqlbench.engine.api.activityimpl;
import com.codahale.metrics.Timer; import com.codahale.metrics.Timer;
import io.nosqlbench.api.config.NBLabeledElement; import io.nosqlbench.api.config.NBLabeledElement;
import io.nosqlbench.api.config.NBLabels; import io.nosqlbench.api.config.NBLabels;
import io.nosqlbench.api.config.params.ParamsParser;
import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.metrics.ActivityMetrics; import io.nosqlbench.api.engine.metrics.ActivityMetrics;
@ -79,7 +80,6 @@ public class SimpleActivity implements Activity {
private RunState runState = RunState.Uninitialized; private RunState runState = RunState.Uninitialized;
private RateLimiter strideLimiter; private RateLimiter strideLimiter;
private RateLimiter cycleLimiter; private RateLimiter cycleLimiter;
private RateLimiter phaseLimiter;
private ActivityController activityController; private ActivityController activityController;
private ActivityInstrumentation activityInstrumentation; private ActivityInstrumentation activityInstrumentation;
private PrintWriter console; private PrintWriter console;
@ -277,30 +277,11 @@ public class SimpleActivity implements Activity {
return strideLimiter; return strideLimiter;
} }
@Override
public RateLimiter getPhaseLimiter() {
return phaseLimiter;
}
@Override @Override
public Timer getResultTimer() { public Timer getResultTimer() {
return ActivityMetrics.timer(this, "result", getParams().getOptionalInteger("hdr_digits").orElse(4)); return ActivityMetrics.timer(this, "result", getParams().getOptionalInteger("hdr_digits").orElse(4));
} }
@Override
public void setPhaseLimiter(RateLimiter rateLimiter) {
this.phaseLimiter = rateLimiter;
}
@Override
public synchronized RateLimiter getPhaseRateLimiter(Supplier<? extends RateLimiter> supplier) {
if (null == this.phaseLimiter) {
phaseLimiter = supplier.get();
}
return phaseLimiter;
}
@Override @Override
public synchronized ActivityInstrumentation getInstrumentation() { public synchronized ActivityInstrumentation getInstrumentation() {
if (null == this.activityInstrumentation) { if (null == this.activityInstrumentation) {
@ -350,10 +331,6 @@ public class SimpleActivity implements Activity {
.map(RateSpec::new).ifPresent( .map(RateSpec::new).ifPresent(
spec -> cycleLimiter = RateLimiters.createOrUpdate(this, "cycles", cycleLimiter, spec)); spec -> cycleLimiter = RateLimiters.createOrUpdate(this, "cycles", cycleLimiter, spec));
activityDef.getParams().getOptionalNamedParameter("phaserate")
.map(RateSpec::new)
.ifPresent(spec -> phaseLimiter = RateLimiters.createOrUpdate(this, "phases", phaseLimiter, spec));
} }
/** /**
@ -675,9 +652,14 @@ public class SimpleActivity implements Activity {
Optional<String> stmt = activityDef.getParams().getOptionalString("op", "stmt", "statement"); Optional<String> stmt = activityDef.getParams().getOptionalString("op", "stmt", "statement");
Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); Optional<String> op_yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
if (stmt.isPresent()) { if (stmt.isPresent()) {
String op = stmt.get();
workloadSource = "commandline:" + stmt.get(); workloadSource = "commandline:" + stmt.get();
if (op.startsWith("{")||op.startsWith("[")) {
return OpsLoader.loadString(stmt.get(), OpTemplateFormat.json, activityDef.getParams(), null);
} else {
return OpsLoader.loadString(stmt.get(), OpTemplateFormat.inline, activityDef.getParams(), null); return OpsLoader.loadString(stmt.get(), OpTemplateFormat.inline, activityDef.getParams(), null);
} }
}
if (op_yaml_loc.isPresent()) { if (op_yaml_loc.isPresent()) {
workloadSource = "yaml:" + op_yaml_loc.get(); workloadSource = "yaml:" + op_yaml_loc.get();
return OpsLoader.loadPath(op_yaml_loc.get(), activityDef.getParams(), "activities"); return OpsLoader.loadPath(op_yaml_loc.get(), activityDef.getParams(), "activities");

View File

@ -215,17 +215,14 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
strideRateLimiter.start(); strideRateLimiter.start();
} }
long strideDelay = 0L; long strideDelay = 0L;
long cycleDelay = 0L; long cycleDelay = 0L;
long phaseDelay = 0L;
// Reviewer Note: This separate of code paths was used to avoid impacting the // Reviewer Note: This separate of code paths was used to avoid impacting the
// previously logic for the SyncAction type. It may be consolidated later once // previously logic for the SyncAction type. It may be consolidated later once
// the async action is proven durable // the async action is proven durable
if (action instanceof AsyncAction) { if (action instanceof AsyncAction) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
AsyncAction<D> async = (AsyncAction) action; AsyncAction<D> async = (AsyncAction) action;
@ -387,12 +384,7 @@ public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
long cycleStart = System.nanoTime(); long cycleStart = System.nanoTime();
try { try {
logger.trace(()->"cycle " + cyclenum); logger.trace(()->"cycle " + cyclenum);
// runCycle
long phaseStart = System.nanoTime();
result = sync.runCycle(cyclenum); result = sync.runCycle(cyclenum);
long phaseEnd = System.nanoTime();
} catch (Exception e) { } catch (Exception e) {
motorState.enterState(Errored); motorState.enterState(Errored);
throw e; throw e;