nosqlbench-1323 Add support for cyclerate_per_thread in NB5

This commit is contained in:
Jonathan Shook 2024-04-15 23:41:43 -05:00
parent 4c30fd7cf6
commit 73cd1f1fad
8 changed files with 180 additions and 119 deletions

View File

@ -74,7 +74,7 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
}
@Override
public NBComponent attachChild(NBComponent... children) {
public synchronized NBComponent attachChild(NBComponent... children) {
for (NBComponent child : children) {
logger.debug(() -> "attaching " + child.description() + " to parent " + this.description());

View File

@ -106,22 +106,6 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
*/
RateLimiter getCycleLimiter();
/**
* Set the cycle rate limiter for this activity. This method should only
* be used non-concurrently. Otherwise, the supplier version
* {@link #getCycleRateLimiter(Supplier)} should be used.
* @param rateLimiter The cycle {@link RateLimiter} for this activity
*/
void setCycleLimiter(RateLimiter rateLimiter);
/**
* Get or create the cycle rate limiter in a safe way. Implementations
* should ensure that this method is synchronized or that each requester
* gets the same cycle rate limiter for the activity.
* @param supplier A {@link RateLimiter} {@link Supplier}
* @return An extant or newly created cycle {@link RateLimiter}
*/
RateLimiter getCycleRateLimiter(Supplier<? extends RateLimiter> supplier);
/**
* Get the current stride rate limiter for this activity.
@ -131,23 +115,6 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
*/
RateLimiter getStrideLimiter();
/**
* Set the stride rate limiter for this activity. This method should only
* be used non-concurrently. Otherwise, the supplier version
* {@link #getStrideRateLimiter(Supplier)}} should be used.
* @param rateLimiter The stride {@link RateLimiter} for this activity.
*/
void setStrideLimiter(RateLimiter rateLimiter);
/**
* Get or create the stride {@link RateLimiter} in a concurrent-safe
* way. Implementations should ensure that this method is synchronized or
* that each requester gets the same stride rate limiter for the activity.
* @param supplier A {@link RateLimiter} {@link Supplier}
* @return An extant or newly created stride {@link RateLimiter}
*/
RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> supplier);
/**
* Get or create the instrumentation needed for this activity. This provides
* a single place to find and manage, and document instrumentation that is

View File

@ -79,7 +79,11 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
private long startTime;
public SimRate(NBComponent parent, SimRateSpec spec) {
super(parent, NBLabels.forKV().and("rateType",
this(parent, spec, NBLabels.forKV());
}
public SimRate(NBComponent parent, SimRateSpec spec, NBLabels extraLabels) {
super(parent, extraLabels.and("rateType",
(spec instanceof CycleRateSpec? "cycle" : "stride")));
this.spec = spec;
initMetrics();

View File

@ -49,7 +49,6 @@ import java.time.temporal.ChronoUnit;
* </UL>
* <p>
* Where:
*
* <EM>rate</EM> is the ops per second, expressed as any positive floating point value.
* <EM>burst ratio</EM> is a floating point value greater than 1.0 which determines how much faster
* the rate limiter may go to catch up to the overall.
@ -128,8 +127,13 @@ public class SimRateSpec {
public static final double DEFAULT_RATE_OPS_S = 1.0D;
public static final double DEFAULT_BURST_RATIO = 1.1D;
public static Verb DEFAULT_VERB = Verb.start;
public enum Scope {
thread,
activity
}
public ChronoUnit unit;
private Scope scope = Scope.activity;
/**
* Target rate in Operations Per Second
@ -212,16 +216,23 @@ public class SimRateSpec {
public SimRateSpec(double opsPerSec, double burstRatio) {
this(opsPerSec, burstRatio, DEFAULT_VERB);
}
public SimRateSpec(double opsPerSec, double burstRatio, Verb type) {
apply(opsPerSec, burstRatio, verb);
public SimRateSpec(double opsPerSec, double burstRatio, Verb verb) {
apply(opsPerSec, burstRatio, verb, Scope.activity);
}
public SimRateSpec(double opsPerSec, double burstRatio, Scope scope) {
apply(opsPerSec, burstRatio, DEFAULT_VERB, scope);
}
public SimRateSpec(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
apply(opsPerSec, burstRatio, verb, scope);
}
private void apply(double opsPerSec, double burstRatio, Verb verb) {
private void apply(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
this.opsPerSec = opsPerSec;
this.burstRatio = burstRatio;
this.verb = verb;
this.unit = chronoUnitFor(opsPerSec);
this.scope = scope;
// TODO: include burst into ticks calculation
}
@ -245,13 +256,21 @@ public class SimRateSpec {
public SimRateSpec(String spec) {
String[] specs = spec.split("[,:;]");
Verb verb = Verb.start;
double burstRatio = DEFAULT_BURST_RATIO;
double opsPerSec;
double burstRatio = DEFAULT_BURST_RATIO;
Verb verb = Verb.start;
Scope scope = Scope.activity;
switch (specs.length) {
case 4:
scope = Scope.valueOf(specs[3].toLowerCase());
case 3:
verb = Verb.valueOf(specs[2].toLowerCase());
logger.debug("selected rate limiter type: " + verb);
try {
scope = Scope.valueOf(specs[2].toLowerCase());
logger.debug("selected rate limiter scope: " + scope);
} catch (IllegalArgumentException iae) {
verb = Verb.valueOf(specs[2].toLowerCase());
logger.debug("selected rate limiter type: " + verb);
}
case 2:
burstRatio = Double.valueOf(specs[1]);
if (burstRatio < 1.0) {
@ -263,7 +282,7 @@ public class SimRateSpec {
default:
throw new RuntimeException("Rate specs must be either '<rate>' or '<rate>:<burstRatio>' as in 5000.0 or 5000.0:1.0");
}
apply(opsPerSec, burstRatio, verb);
apply(opsPerSec, burstRatio, verb, scope);
}
public String toString() {
@ -319,5 +338,10 @@ public class SimRateSpec {
return this.verb == Verb.restart;
}
public Scope getScope() {
return this.scope;
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.simrate;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Supplier;
public class ThreadLocalRateLimiters {
private static final Logger logger = LogManager.getLogger(ThreadLocalRateLimiters.class);
public static synchronized ThreadLocal<RateLimiter> createOrUpdate(
final NBComponent parent,
final ThreadLocal<RateLimiter> extantSource,
final SimRateSpec spec
) {
if (extantSource != null) {
RateLimiter rl = extantSource.get();
rl.applyRateSpec(spec);
return extantSource;
} else {
Supplier<RateLimiter> rls;
rls = switch (spec.getScope()) {
case activity -> {
SimRate rl = new SimRate(parent, spec);
yield () -> rl;
}
case thread -> () -> new SimRate(
parent,
spec,
NBLabels.forKV("thread", Thread.currentThread().getName())
);
};
return ThreadLocal.withInitial(rls);
}
}
}

View File

@ -18,6 +18,7 @@ package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.engine.api.activityapi.simrate.*;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.events.ParamChange;
@ -28,9 +29,6 @@ import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiters;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.nb.api.components.status.NBStatusComponent;
@ -42,12 +40,10 @@ import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispens
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunOpDispenserWrapper;
@ -78,8 +74,8 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
private OutputDispenser markerDispenser;
private IntPredicateDispenser resultFilterDispenser;
private RunState runState = RunState.Uninitialized;
private RateLimiter strideLimiter;
private RateLimiter cycleLimiter;
private ThreadLocal<RateLimiter> strideLimiterSource;
private ThreadLocal<RateLimiter> cycleLimiterSource;
private ActivityInstrumentation activityInstrumentation;
private PrintWriter console;
private long startedAtMillis;
@ -229,41 +225,21 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
@Override
public RateLimiter getCycleLimiter() {
return this.cycleLimiter;
}
@Override
public synchronized void setCycleLimiter(RateLimiter rateLimiter) {
this.cycleLimiter = rateLimiter;
}
@Override
public synchronized RateLimiter getCycleRateLimiter(Supplier<? extends RateLimiter> s) {
if (null == this.cycleLimiter) {
cycleLimiter = s.get();
if (cycleLimiterSource!=null) {
return cycleLimiterSource.get();
} else {
return null;
}
return cycleLimiter;
}
@Override
public synchronized RateLimiter getStrideLimiter() {
return this.strideLimiter;
}
@Override
public synchronized void setStrideLimiter(RateLimiter rateLimiter) {
this.strideLimiter = rateLimiter;
}
@Override
public synchronized RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> s) {
if (null == this.strideLimiter) {
strideLimiter = s.get();
if (strideLimiterSource!=null) {
return strideLimiterSource.get();
} else {
return null;
}
return strideLimiter;
}
@Override
public synchronized ActivityInstrumentation getInstrumentation() {
if (null == this.activityInstrumentation) {
@ -306,6 +282,8 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false);
activityDef.getParams().getOptionalNamedParameter("striderate")
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
@ -315,13 +293,14 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
}
public void createOrUpdateStrideLimiter(SimRateSpec spec) {
strideLimiter = RateLimiters.createOrUpdate(this, strideLimiter, spec);
strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec);
}
public void createOrUpdateCycleLimiter(SimRateSpec spec) {
cycleLimiter = RateLimiters.createOrUpdate(this, cycleLimiter, spec);
cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec);
}
/**
* Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the
* length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable
@ -344,10 +323,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
if (cyclesOpt.isEmpty()) {
String cycles = getParams().getOptionalString("stride").orElseThrow();
logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)");
// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow());
// getParams().setSilently("cycles", getParams().getOptionalString("stride").orElseThrow());
this.getActivityDef().setCycles(getParams().getOptionalString("stride").orElseThrow());
// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow());
} else {
if (0 == activityDef.getCycleCount()) {
throw new RuntimeException(
@ -680,4 +656,22 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
public Map<String, String> asResult() {
return Map.of("activity",this.getAlias());
}
// private final ThreadLocal<RateLimiter> cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> {
// RateLimiters.createOrUpdate(this,null,new SimRateSpec()
// if (cycleratePerThread) {
// return RateLimiters.createOrUpdate(new NBThreadComponent(this),null,)
// } else {
// RateLimiters.createOrUpdate(new NBThreadComponent(this),null,activityDef)
// }
// if (getCycleLimiter() != null) {
// return RateLimiters.createOrUpdate(
// new NBThreadComponent(this),
// getCycleLimiter(),
// getCycleLimiter().getSpec());
// } else {
// return null;
// }
// });
}

View File

@ -54,16 +54,18 @@ import java.util.concurrent.ConcurrentHashMap;
* core of all new activity types. Extant NB drivers should also migrate
* to this when possible.
*
* @param <R> A type of runnable which wraps the operations for this type of driver.
* @param <S> The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
* @param <R>
* A type of runnable which wraps the operations for this type of driver.
* @param <S>
* The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
*/
public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver {
private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence<OpDispenser<? extends Op>> sequence;
private final ConcurrentHashMap<String, DriverAdapter<?,?>> adapters = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, DriverAdapter<?, ?>> adapters = new ConcurrentHashMap<>();
public StandardActivity(NBComponent parent, ActivityDef activityDef) {
super(parent,activityDef);
super(parent, activityDef);
OpsDocList workload;
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
@ -77,9 +79,9 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
}
Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
Optional<DriverAdapter<?,?>> defaultAdapter = defaultDriverName
.flatMap(name -> ServiceSelector.of(name,ServiceLoader.load(DriverAdapterLoader.class)).get())
.map(l -> l.load(this,NBLabels.forKV()));
Optional<DriverAdapter<?, ?>> defaultAdapter = defaultDriverName
.flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get())
.map(l -> l.load(this, NBLabels.forKV()));
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + '\'');
@ -90,7 +92,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
List<ParsedOp> pops = new ArrayList<>();
List<DriverAdapter<?,?>> adapterlist = new ArrayList<>();
List<DriverAdapter<?, ?>> adapterlist = new ArrayList<>();
NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(yamlmodel);
Optional<String> defaultDriverOption = defaultDriverName;
@ -107,16 +109,15 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
// .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
// HERE
if (!adapters.containsKey(driverName)) {
DriverAdapter<?,?> adapter = Optional.of(driverName)
DriverAdapter<?, ?> adapter = Optional.of(driverName)
.flatMap(
name -> ServiceSelector.of(
name,
ServiceLoader.load(DriverAdapterLoader.class)
)
name -> ServiceSelector.of(
name,
ServiceLoader.load(DriverAdapterLoader.class)
)
.get())
.map(
l -> l.load(
@ -143,7 +144,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
DriverAdapter<?,?> adapter = adapters.get(driverName);
DriverAdapter<?, ?> adapter = adapters.get(driverName);
adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
@ -166,24 +167,24 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
}
create().gauge(
create().gauge(
"ops_pending",
() -> this.getProgressMeter().getSummary().pending(),
MetricCategory.Core,
"The current number of operations which have not been dispatched for processing yet."
);
create().gauge(
() -> this.getProgressMeter().getSummary().pending(),
MetricCategory.Core,
"The current number of operations which have not been dispatched for processing yet."
);
create().gauge(
"ops_active",
() -> this.getProgressMeter().getSummary().current(),
MetricCategory.Core,
"The current number of operations which have been dispatched for processing, but which have not yet completed."
);
create().gauge(
() -> this.getProgressMeter().getSummary().current(),
MetricCategory.Core,
"The current number of operations which have been dispatched for processing, but which have not yet completed."
);
create().gauge(
"ops_complete",
() -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core,
"The current number of operations which have been completed"
);
() -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core,
"The current number of operations which have been completed"
);
}
@Override
@ -213,7 +214,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef);
for (DriverAdapter<?,?> adapter : adapters.values()) {
for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
@ -244,7 +245,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
@Override
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> cfg) {
List<OpTemplate> opTemplates = new ArrayList<>();
for (DriverAdapter<?,?> adapter : adapters.values()) {
for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) {
List<OpTemplate> newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg);
opTemplates.addAll(newTemplates);
@ -260,7 +261,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
*/
@Override
public void shutdownActivity() {
for (Map.Entry<String, DriverAdapter<?,?>> entry : adapters.entrySet()) {
for (Map.Entry<String, DriverAdapter<?, ?>> entry : adapters.entrySet()) {
String adapterName = entry.getKey();
DriverAdapter<?, ?> adapter = entry.getValue();
adapter.getSpaceCache().getElements().forEach((spaceName, space) -> {
@ -284,7 +285,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
@Override
public void onEvent(NBEvent event) {
switch(event) {
switch (event) {
case ParamChange<?> pc -> {
switch (pc.value()) {
case SetThreads st -> activityDef.setThreads(st.threads);

View File

@ -46,4 +46,19 @@ public class SimRateSpecTest {
SimRateSpec c = new SimRateSpec("12345,1.1");
assertThat(c.verb).isEqualTo(SimRateSpec.Verb.start);
}
@Test
public void testScopeSelection() {
SimRateSpec asd = new SimRateSpec("12345,1.4");
assertThat(asd.getScope()).isEqualTo(SimRateSpec.Scope.activity);
SimRateSpec ts = new SimRateSpec("12345,1.4,start,thread");
assertThat(ts.getScope()).isEqualTo(SimRateSpec.Scope.thread);
SimRateSpec as = new SimRateSpec("12345,1.4,start,activity");
assertThat(as.getScope()).isEqualTo(SimRateSpec.Scope.activity);
SimRateSpec asa = new SimRateSpec("12345,1.4,activity");
assertThat(asa.getScope()).isEqualTo(SimRateSpec.Scope.activity);
SimRateSpec ast = new SimRateSpec("12345,1.4,thread");
assertThat(ast.getScope()).isEqualTo(SimRateSpec.Scope.thread);
}
}