Merge pull request #1928 from nosqlbench/nosqlbench-1323-tlrate

Implement thread-local scope for rate limiters and improve spec format
This commit is contained in:
Jonathan Shook 2024-04-16 10:29:44 -05:00 committed by GitHub
commit c2aa0bf877
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 247 additions and 136 deletions

View File

@ -74,7 +74,7 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
} }
@Override @Override
public NBComponent attachChild(NBComponent... children) { public synchronized NBComponent attachChild(NBComponent... children) {
for (NBComponent child : children) { for (NBComponent child : children) {
logger.debug(() -> "attaching " + child.description() + " to parent " + this.description()); logger.debug(() -> "attaching " + child.description() + " to parent " + this.description());

View File

@ -106,22 +106,6 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
*/ */
RateLimiter getCycleLimiter(); RateLimiter getCycleLimiter();
/**
* Set the cycle rate limiter for this activity. This method should only
* be used non-concurrently. Otherwise, the supplier version
* {@link #getCycleRateLimiter(Supplier)} should be used.
* @param rateLimiter The cycle {@link RateLimiter} for this activity
*/
void setCycleLimiter(RateLimiter rateLimiter);
/**
* Get or create the cycle rate limiter in a safe way. Implementations
* should ensure that this method is synchronized or that each requester
* gets the same cycle rate limiter for the activity.
* @param supplier A {@link RateLimiter} {@link Supplier}
* @return An extant or newly created cycle {@link RateLimiter}
*/
RateLimiter getCycleRateLimiter(Supplier<? extends RateLimiter> supplier);
/** /**
* Get the current stride rate limiter for this activity. * Get the current stride rate limiter for this activity.
@ -131,23 +115,6 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
*/ */
RateLimiter getStrideLimiter(); RateLimiter getStrideLimiter();
/**
* Set the stride rate limiter for this activity. This method should only
* be used non-concurrently. Otherwise, the supplier version
* {@link #getStrideRateLimiter(Supplier)}} should be used.
* @param rateLimiter The stride {@link RateLimiter} for this activity.
*/
void setStrideLimiter(RateLimiter rateLimiter);
/**
* Get or create the stride {@link RateLimiter} in a concurrent-safe
* way. Implementations should ensure that this method is synchronized or
* that each requester gets the same stride rate limiter for the activity.
* @param supplier A {@link RateLimiter} {@link Supplier}
* @return An extant or newly created stride {@link RateLimiter}
*/
RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> supplier);
/** /**
* Get or create the instrumentation needed for this activity. This provides * Get or create the instrumentation needed for this activity. This provides
* a single place to find and manage, and document instrumentation that is * a single place to find and manage, and document instrumentation that is

View File

@ -79,7 +79,11 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
private long startTime; private long startTime;
public SimRate(NBComponent parent, SimRateSpec spec) { public SimRate(NBComponent parent, SimRateSpec spec) {
super(parent, NBLabels.forKV().and("rateType", this(parent, spec, NBLabels.forKV());
}
public SimRate(NBComponent parent, SimRateSpec spec, NBLabels extraLabels) {
super(parent, extraLabels.and("rateType",
(spec instanceof CycleRateSpec? "cycle" : "stride"))); (spec instanceof CycleRateSpec? "cycle" : "stride")));
this.spec = spec; this.spec = spec;
initMetrics(); initMetrics();

View File

@ -18,10 +18,15 @@ package io.nosqlbench.engine.api.activityapi.simrate;
import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap; import io.nosqlbench.nb.api.engine.activityimpl.ParameterMap;
import io.nosqlbench.nb.api.engine.util.Unit; import io.nosqlbench.nb.api.engine.util.Unit;
import io.nosqlbench.nb.api.errors.BasicError;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import java.time.temporal.ChronoUnit; import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
/** /**
* <H2>Rate Limiter Specifications</H2> * <H2>Rate Limiter Specifications</H2>
@ -49,7 +54,6 @@ import java.time.temporal.ChronoUnit;
* </UL> * </UL>
* <p> * <p>
* Where: * Where:
*
* <EM>rate</EM> is the ops per second, expressed as any positive floating point value. * <EM>rate</EM> is the ops per second, expressed as any positive floating point value.
* <EM>burst ratio</EM> is a floating point value greater than 1.0 which determines how much faster * <EM>burst ratio</EM> is a floating point value greater than 1.0 which determines how much faster
* the rate limiter may go to catch up to the overall. * the rate limiter may go to catch up to the overall.
@ -129,7 +133,13 @@ public class SimRateSpec {
public static final double DEFAULT_BURST_RATIO = 1.1D; public static final double DEFAULT_BURST_RATIO = 1.1D;
public static Verb DEFAULT_VERB = Verb.start; public static Verb DEFAULT_VERB = Verb.start;
public enum Scope {
thread,
activity
}
public ChronoUnit unit; public ChronoUnit unit;
private Scope scope = Scope.activity;
/** /**
* Target rate in Operations Per Second * Target rate in Operations Per Second
@ -158,9 +168,9 @@ public class SimRateSpec {
// } // }
return switch (unit) { return switch (unit) {
case NANOS -> (int) newNanoTokens; case NANOS -> (int) newNanoTokens;
case MICROS -> (int) (newNanoTokens/1_000L); case MICROS -> (int) (newNanoTokens / 1_000L);
case MILLIS -> (int) (newNanoTokens/1_000_000L); case MILLIS -> (int) (newNanoTokens / 1_000_000L);
case SECONDS -> (int) (newNanoTokens/1_000_000_000L); case SECONDS -> (int) (newNanoTokens / 1_000_000_000L);
default -> throw new RuntimeException("invalid ChronoUnit for nanosToTicks:" + unit); default -> throw new RuntimeException("invalid ChronoUnit for nanosToTicks:" + unit);
}; };
} }
@ -168,9 +178,9 @@ public class SimRateSpec {
public long ticksToNanos(int newTicks) { public long ticksToNanos(int newTicks) {
return switch (unit) { return switch (unit) {
case NANOS -> newTicks; case NANOS -> newTicks;
case MICROS -> newTicks*1_000L; case MICROS -> newTicks * 1_000L;
case MILLIS -> newTicks*1_000_000L; case MILLIS -> newTicks * 1_000_000L;
case SECONDS -> newTicks*1_000_000_000L; case SECONDS -> newTicks * 1_000_000_000L;
default -> throw new RuntimeException("invalid ChronoUnit for ticksToNanos:" + unit); default -> throw new RuntimeException("invalid ChronoUnit for ticksToNanos:" + unit);
}; };
} }
@ -213,15 +223,25 @@ public class SimRateSpec {
this(opsPerSec, burstRatio, DEFAULT_VERB); this(opsPerSec, burstRatio, DEFAULT_VERB);
} }
public SimRateSpec(double opsPerSec, double burstRatio, Verb type) { public SimRateSpec(double opsPerSec, double burstRatio, Verb verb) {
apply(opsPerSec, burstRatio, verb); apply(opsPerSec, burstRatio, verb, Scope.activity);
} }
private void apply(double opsPerSec, double burstRatio, Verb verb) { public SimRateSpec(double opsPerSec, double burstRatio, Scope scope) {
apply(opsPerSec, burstRatio, DEFAULT_VERB, scope);
}
public SimRateSpec(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
apply(opsPerSec, burstRatio, verb, scope);
}
private void apply(double opsPerSec, double burstRatio, Verb verb, Scope scope) {
this.opsPerSec = opsPerSec; this.opsPerSec = opsPerSec;
this.burstRatio = burstRatio; this.burstRatio = burstRatio;
this.verb = verb; this.verb = verb;
this.unit = chronoUnitFor(opsPerSec); this.unit = chronoUnitFor(opsPerSec);
this.scope = scope;
// TODO: include burst into ticks calculation // TODO: include burst into ticks calculation
} }
@ -245,25 +265,59 @@ public class SimRateSpec {
public SimRateSpec(String spec) { public SimRateSpec(String spec) {
String[] specs = spec.split("[,:;]"); String[] specs = spec.split("[,:;]");
Verb verb = Verb.start; int offset=0;
double burstRatio = DEFAULT_BURST_RATIO;
double opsPerSec; double opsPerSec;
switch (specs.length) { double burstRatio = DEFAULT_BURST_RATIO;
case 3: Verb verb = Verb.start;
verb = Verb.valueOf(specs[2].toLowerCase()); Scope scope = Scope.activity;
logger.debug("selected rate limiter type: " + verb); String oprateSpec = specs[offset++];
case 2: opsPerSec = Unit.doubleCountFor(oprateSpec).orElseThrow(() -> new RuntimeException("Unparsable:" + oprateSpec));
burstRatio = Double.valueOf(specs[1]); if (specs.length >= 2) {
if (burstRatio < 1.0) { try {
throw new RuntimeException("burst ratios less than 1.0 are invalid."); burstRatio = Double.parseDouble(specs[1]);
} offset++;
case 1: } catch (NumberFormatException ignored) {
opsPerSec = Unit.doubleCountFor(specs[0]).orElseThrow(() -> new RuntimeException("Unparsable:" + specs[0])); }
break;
default:
throw new RuntimeException("Rate specs must be either '<rate>' or '<rate>:<burstRatio>' as in 5000.0 or 5000.0:1.0");
} }
apply(opsPerSec, burstRatio, verb);
for (int i = offset; i < specs.length; i++) {
String specword = specs[i].toLowerCase();
try {
scope = Scope.valueOf(specword);
specword = null;
logger.debug("selected rate limiter scope: " + scope);
continue;
} catch (IllegalArgumentException ignored) {
}
try {
verb = Verb.valueOf(specword);
specword = null;
logger.debug("selected rate limiter type: " + verb);
continue;
} catch (IllegalArgumentException ignored) {
}
if (specword != null) {
String msg = """
Spec format 'SPECFORMAT' was not recognized for FORTYPE.
Use the format <ops/s>[,<burst ratio][,<verb>][,<scope>]
Examples:
100 (100 ops per second)
100,1.1 (with a burst ratio of 10% over)
100,start (start the rate limiter automatically)
100,thread (scope the rate limiter to each thread in an activity)
100,1.1,start,thread (all of the above)
Defaults: burst_ratio=1.1 verb=start scope=activity
"""
.replaceAll("SPECFORMAT", spec)
.replaceAll("FORTYPE", this.getClass().getSimpleName());
throw new BasicError(msg);
}
}
apply(opsPerSec, burstRatio, verb, scope);
} }
public String toString() { public String toString() {
@ -293,6 +347,8 @@ public class SimRateSpec {
SimRateSpec simRateSpec = (SimRateSpec) o; SimRateSpec simRateSpec = (SimRateSpec) o;
if (Double.compare(simRateSpec.opsPerSec, opsPerSec) != 0) return false; if (Double.compare(simRateSpec.opsPerSec, opsPerSec) != 0) return false;
if (verb!=simRateSpec.verb) return false;
if (scope!=simRateSpec.scope) return false;
return Double.compare(simRateSpec.burstRatio, burstRatio) == 0; return Double.compare(simRateSpec.burstRatio, burstRatio) == 0;
} }
@ -319,5 +375,9 @@ public class SimRateSpec {
return this.verb == Verb.restart; return this.verb == Verb.restart;
} }
public Scope getScope() {
return this.scope;
}
} }

View File

@ -0,0 +1,56 @@
/*
* Copyright (c) 2022-2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.api.activityapi.simrate;
import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.labels.NBLabels;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Supplier;
public class ThreadLocalRateLimiters {
private static final Logger logger = LogManager.getLogger(ThreadLocalRateLimiters.class);
public static synchronized ThreadLocal<RateLimiter> createOrUpdate(
final NBComponent parent,
final ThreadLocal<RateLimiter> extantSource,
final SimRateSpec spec
) {
if (extantSource != null) {
RateLimiter rl = extantSource.get();
rl.applyRateSpec(spec);
return extantSource;
} else {
Supplier<RateLimiter> rls;
rls = switch (spec.getScope()) {
case activity -> {
SimRate rl = new SimRate(parent, spec);
yield () -> rl;
}
case thread -> () -> new SimRate(
parent,
spec,
NBLabels.forKV("thread", Thread.currentThread().getName())
);
};
return ThreadLocal.withInitial(rls);
}
}
}

View File

@ -18,6 +18,7 @@ package io.nosqlbench.engine.api.activityimpl;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterOpDispenserWrapper; import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.EmitterOpDispenserWrapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.engine.api.activityapi.simrate.*;
import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult; import io.nosqlbench.engine.core.lifecycle.scenario.container.InvokableResult;
import io.nosqlbench.nb.api.components.core.NBComponent; import io.nosqlbench.nb.api.components.core.NBComponent;
import io.nosqlbench.nb.api.components.events.ParamChange; import io.nosqlbench.nb.api.components.events.ParamChange;
@ -28,9 +29,6 @@ import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler; import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityapi.planning.SequencerType; import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiters;
import io.nosqlbench.engine.api.activityapi.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.simrate.SimRateSpec;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser; import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper; import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.nb.api.components.status.NBStatusComponent; import io.nosqlbench.nb.api.components.status.NBStatusComponent;
@ -42,12 +40,10 @@ import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispens
import io.nosqlbench.engine.api.activityapi.input.InputDispenser; import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
import io.nosqlbench.engine.api.activityapi.output.OutputDispenser; import io.nosqlbench.engine.api.activityapi.output.OutputDispenser;
import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner; import io.nosqlbench.engine.api.activityapi.planning.SequencePlanner;
import io.nosqlbench.engine.api.activityapi.simrate.RateLimiter;
import io.nosqlbench.adapters.api.activityconfig.OpsLoader; import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate; import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat; import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList; import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
import io.nosqlbench.engine.api.activityapi.simrate.StrideRateSpec;
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally; import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunOpDispenserWrapper; import io.nosqlbench.adapters.api.activityimpl.uniform.opwrappers.DryRunOpDispenserWrapper;
@ -78,8 +74,8 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
private OutputDispenser markerDispenser; private OutputDispenser markerDispenser;
private IntPredicateDispenser resultFilterDispenser; private IntPredicateDispenser resultFilterDispenser;
private RunState runState = RunState.Uninitialized; private RunState runState = RunState.Uninitialized;
private RateLimiter strideLimiter; private ThreadLocal<RateLimiter> strideLimiterSource;
private RateLimiter cycleLimiter; private ThreadLocal<RateLimiter> cycleLimiterSource;
private ActivityInstrumentation activityInstrumentation; private ActivityInstrumentation activityInstrumentation;
private PrintWriter console; private PrintWriter console;
private long startedAtMillis; private long startedAtMillis;
@ -229,41 +225,21 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
@Override @Override
public RateLimiter getCycleLimiter() { public RateLimiter getCycleLimiter() {
return this.cycleLimiter; if (cycleLimiterSource!=null) {
} return cycleLimiterSource.get();
} else {
@Override return null;
public synchronized void setCycleLimiter(RateLimiter rateLimiter) {
this.cycleLimiter = rateLimiter;
}
@Override
public synchronized RateLimiter getCycleRateLimiter(Supplier<? extends RateLimiter> s) {
if (null == this.cycleLimiter) {
cycleLimiter = s.get();
} }
return cycleLimiter;
} }
@Override @Override
public synchronized RateLimiter getStrideLimiter() { public synchronized RateLimiter getStrideLimiter() {
return this.strideLimiter; if (strideLimiterSource!=null) {
} return strideLimiterSource.get();
} else {
@Override return null;
public synchronized void setStrideLimiter(RateLimiter rateLimiter) {
this.strideLimiter = rateLimiter;
}
@Override
public synchronized RateLimiter getStrideRateLimiter(Supplier<? extends RateLimiter> s) {
if (null == this.strideLimiter) {
strideLimiter = s.get();
} }
return strideLimiter;
} }
@Override @Override
public synchronized ActivityInstrumentation getInstrumentation() { public synchronized ActivityInstrumentation getInstrumentation() {
if (null == this.activityInstrumentation) { if (null == this.activityInstrumentation) {
@ -306,6 +282,8 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) { public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
// cycleratePerThread = activityDef.getParams().takeBoolOrDefault("cyclerate_per_thread", false);
activityDef.getParams().getOptionalNamedParameter("striderate") activityDef.getParams().getOptionalNamedParameter("striderate")
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr))); .map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
@ -315,13 +293,14 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
} }
public void createOrUpdateStrideLimiter(SimRateSpec spec) { public void createOrUpdateStrideLimiter(SimRateSpec spec) {
strideLimiter = RateLimiters.createOrUpdate(this, strideLimiter, spec); strideLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, strideLimiterSource, spec);
} }
public void createOrUpdateCycleLimiter(SimRateSpec spec) { public void createOrUpdateCycleLimiter(SimRateSpec spec) {
cycleLimiter = RateLimiters.createOrUpdate(this, cycleLimiter, spec); cycleLimiterSource = ThreadLocalRateLimiters.createOrUpdate(this, cycleLimiterSource, spec);
} }
/** /**
* Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the * Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the
* length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable * length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable
@ -344,10 +323,7 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
if (cyclesOpt.isEmpty()) { if (cyclesOpt.isEmpty()) {
String cycles = getParams().getOptionalString("stride").orElseThrow(); String cycles = getParams().getOptionalString("stride").orElseThrow();
logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)"); logger.info(() -> "defaulting cycles to " + cycles + " (the stride length)");
// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow());
// getParams().setSilently("cycles", getParams().getOptionalString("stride").orElseThrow());
this.getActivityDef().setCycles(getParams().getOptionalString("stride").orElseThrow()); this.getActivityDef().setCycles(getParams().getOptionalString("stride").orElseThrow());
// getParams().set("cycles", getParams().getOptionalString("stride").orElseThrow());
} else { } else {
if (0 == activityDef.getCycleCount()) { if (0 == activityDef.getCycleCount()) {
throw new RuntimeException( throw new RuntimeException(
@ -680,4 +656,22 @@ public class SimpleActivity extends NBStatusComponent implements Activity, Invok
public Map<String, String> asResult() { public Map<String, String> asResult() {
return Map.of("activity",this.getAlias()); return Map.of("activity",this.getAlias());
} }
// private final ThreadLocal<RateLimiter> cycleLimiterThreadLocal = ThreadLocal.withInitial(() -> {
// RateLimiters.createOrUpdate(this,null,new SimRateSpec()
// if (cycleratePerThread) {
// return RateLimiters.createOrUpdate(new NBThreadComponent(this),null,)
// } else {
// RateLimiters.createOrUpdate(new NBThreadComponent(this),null,activityDef)
// }
// if (getCycleLimiter() != null) {
// return RateLimiters.createOrUpdate(
// new NBThreadComponent(this),
// getCycleLimiter(),
// getCycleLimiter().getSpec());
// } else {
// return null;
// }
// });
} }

View File

@ -54,16 +54,18 @@ import java.util.concurrent.ConcurrentHashMap;
* core of all new activity types. Extant NB drivers should also migrate * core of all new activity types. Extant NB drivers should also migrate
* to this when possible. * to this when possible.
* *
* @param <R> A type of runnable which wraps the operations for this type of driver. * @param <R>
* @param <S> The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph * A type of runnable which wraps the operations for this type of driver.
* @param <S>
* The context type for the activity, AKA the 'space' for a named driver instance and its associated object graph
*/ */
public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver { public class StandardActivity<R extends Op, S> extends SimpleActivity implements SyntheticOpTemplateProvider, ActivityDefObserver {
private static final Logger logger = LogManager.getLogger("ACTIVITY"); private static final Logger logger = LogManager.getLogger("ACTIVITY");
private final OpSequence<OpDispenser<? extends Op>> sequence; private final OpSequence<OpDispenser<? extends Op>> sequence;
private final ConcurrentHashMap<String, DriverAdapter<?,?>> adapters = new ConcurrentHashMap<>(); private final ConcurrentHashMap<String, DriverAdapter<?, ?>> adapters = new ConcurrentHashMap<>();
public StandardActivity(NBComponent parent, ActivityDef activityDef) { public StandardActivity(NBComponent parent, ActivityDef activityDef) {
super(parent,activityDef); super(parent, activityDef);
OpsDocList workload; OpsDocList workload;
Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload"); Optional<String> yaml_loc = activityDef.getParams().getOptionalString("yaml", "workload");
@ -77,9 +79,9 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
} }
Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver"); Optional<String> defaultDriverName = activityDef.getParams().getOptionalString("driver");
Optional<DriverAdapter<?,?>> defaultAdapter = defaultDriverName Optional<DriverAdapter<?, ?>> defaultAdapter = defaultDriverName
.flatMap(name -> ServiceSelector.of(name,ServiceLoader.load(DriverAdapterLoader.class)).get()) .flatMap(name -> ServiceSelector.of(name, ServiceLoader.load(DriverAdapterLoader.class)).get())
.map(l -> l.load(this,NBLabels.forKV())); .map(l -> l.load(this, NBLabels.forKV()));
if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) { if (defaultDriverName.isPresent() && defaultAdapter.isEmpty()) {
throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + '\''); throw new BasicError("Unable to load default driver adapter '" + defaultDriverName.get() + '\'');
@ -90,7 +92,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
List<ParsedOp> pops = new ArrayList<>(); List<ParsedOp> pops = new ArrayList<>();
List<DriverAdapter<?,?>> adapterlist = new ArrayList<>(); List<DriverAdapter<?, ?>> adapterlist = new ArrayList<>();
NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(yamlmodel); NBConfigModel supersetConfig = ConfigModel.of(StandardActivity.class).add(yamlmodel);
Optional<String> defaultDriverOption = defaultDriverName; Optional<String> defaultDriverOption = defaultDriverName;
@ -107,16 +109,15 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
// .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot)); // .orElseThrow(() -> new OpConfigError("Unable to identify driver name for op template:\n" + ot));
// HERE // HERE
if (!adapters.containsKey(driverName)) { if (!adapters.containsKey(driverName)) {
DriverAdapter<?,?> adapter = Optional.of(driverName) DriverAdapter<?, ?> adapter = Optional.of(driverName)
.flatMap( .flatMap(
name -> ServiceSelector.of( name -> ServiceSelector.of(
name, name,
ServiceLoader.load(DriverAdapterLoader.class) ServiceLoader.load(DriverAdapterLoader.class)
) )
.get()) .get())
.map( .map(
l -> l.load( l -> l.load(
@ -143,7 +144,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap()); supersetConfig.assertValidConfig(activityDef.getParams().getStringStringMap());
DriverAdapter<?,?> adapter = adapters.get(driverName); DriverAdapter<?, ?> adapter = adapters.get(driverName);
adapterlist.add(adapter); adapterlist.add(adapter);
ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this); ParsedOp pop = new ParsedOp(ot, adapter.getConfiguration(), List.of(adapter.getPreprocessor()), this);
Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class); Optional<String> discard = pop.takeOptionalStaticValue("driver", String.class);
@ -166,24 +167,24 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e); throw new OpConfigError("Error mapping workload template to operations: " + e.getMessage(), null, e);
} }
create().gauge( create().gauge(
"ops_pending", "ops_pending",
() -> this.getProgressMeter().getSummary().pending(), () -> this.getProgressMeter().getSummary().pending(),
MetricCategory.Core, MetricCategory.Core,
"The current number of operations which have not been dispatched for processing yet." "The current number of operations which have not been dispatched for processing yet."
); );
create().gauge( create().gauge(
"ops_active", "ops_active",
() -> this.getProgressMeter().getSummary().current(), () -> this.getProgressMeter().getSummary().current(),
MetricCategory.Core, MetricCategory.Core,
"The current number of operations which have been dispatched for processing, but which have not yet completed." "The current number of operations which have been dispatched for processing, but which have not yet completed."
); );
create().gauge( create().gauge(
"ops_complete", "ops_complete",
() -> this.getProgressMeter().getSummary().complete(), () -> this.getProgressMeter().getSummary().complete(),
MetricCategory.Core, MetricCategory.Core,
"The current number of operations which have been completed" "The current number of operations which have been completed"
); );
} }
@Override @Override
@ -213,7 +214,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
public synchronized void onActivityDefUpdate(ActivityDef activityDef) { public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
super.onActivityDefUpdate(activityDef); super.onActivityDefUpdate(activityDef);
for (DriverAdapter<?,?> adapter : adapters.values()) { for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof NBReconfigurable configurable) { if (adapter instanceof NBReconfigurable configurable) {
NBConfigModel cfgModel = configurable.getReconfigModel(); NBConfigModel cfgModel = configurable.getReconfigModel();
NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams()); NBConfiguration cfg = cfgModel.matchConfig(activityDef.getParams());
@ -244,7 +245,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
@Override @Override
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> cfg) { public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> cfg) {
List<OpTemplate> opTemplates = new ArrayList<>(); List<OpTemplate> opTemplates = new ArrayList<>();
for (DriverAdapter<?,?> adapter : adapters.values()) { for (DriverAdapter<?, ?> adapter : adapters.values()) {
if (adapter instanceof SyntheticOpTemplateProvider sotp) { if (adapter instanceof SyntheticOpTemplateProvider sotp) {
List<OpTemplate> newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg); List<OpTemplate> newTemplates = sotp.getSyntheticOpTemplates(opsDocList, cfg);
opTemplates.addAll(newTemplates); opTemplates.addAll(newTemplates);
@ -260,7 +261,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
*/ */
@Override @Override
public void shutdownActivity() { public void shutdownActivity() {
for (Map.Entry<String, DriverAdapter<?,?>> entry : adapters.entrySet()) { for (Map.Entry<String, DriverAdapter<?, ?>> entry : adapters.entrySet()) {
String adapterName = entry.getKey(); String adapterName = entry.getKey();
DriverAdapter<?, ?> adapter = entry.getValue(); DriverAdapter<?, ?> adapter = entry.getValue();
adapter.getSpaceCache().getElements().forEach((spaceName, space) -> { adapter.getSpaceCache().getElements().forEach((spaceName, space) -> {
@ -284,7 +285,7 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
@Override @Override
public void onEvent(NBEvent event) { public void onEvent(NBEvent event) {
switch(event) { switch (event) {
case ParamChange<?> pc -> { case ParamChange<?> pc -> {
switch (pc.value()) { switch (pc.value()) {
case SetThreads st -> activityDef.setThreads(st.threads); case SetThreads st -> activityDef.setThreads(st.threads);

View File

@ -46,4 +46,33 @@ public class SimRateSpecTest {
SimRateSpec c = new SimRateSpec("12345,1.1"); SimRateSpec c = new SimRateSpec("12345,1.1");
assertThat(c.verb).isEqualTo(SimRateSpec.Verb.start); assertThat(c.verb).isEqualTo(SimRateSpec.Verb.start);
} }
@Test
public void testVariantFormats() {
assertThat(new SimRateSpec("12345"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.1d, SimRateSpec.Verb.start, SimRateSpec.Scope.activity
));
assertThat(new SimRateSpec("12345,1.4"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.4d, SimRateSpec.Verb.start, SimRateSpec.Scope.activity
));
assertThat(new SimRateSpec("12345,configure"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.1d, SimRateSpec.Verb.configure, SimRateSpec.Scope.activity
));
assertThat(new SimRateSpec("12345,thread"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.1d, SimRateSpec.Verb.start, SimRateSpec.Scope.thread
));
assertThat(new SimRateSpec("12345,1.4,thread"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.4d, SimRateSpec.Verb.start, SimRateSpec.Scope.thread
));
assertThat(new SimRateSpec("12345,configure,activity"))
.isEqualTo(new SimRateSpec(
12345.0d, 1.1d, SimRateSpec.Verb.configure, SimRateSpec.Scope.activity
));
}
} }