mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
refactor activity specific signatures into faceted naming interface
This commit is contained in:
parent
355d2fb19f
commit
f699b4f83f
@ -0,0 +1,28 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.diag.types;
|
||||
|
||||
public interface OpNameAware {
|
||||
void setOpName(String opname);
|
||||
static void apply(String opname, Object... awares) {
|
||||
for (Object aware : awares) {
|
||||
if (aware instanceof OpNameAware opNameAware) {
|
||||
opNameAware.setOpName(opname);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -37,11 +37,11 @@ public class ThreadLocalNamedTimers {
|
||||
private final static Map<String, Timer> timers = new HashMap<>();
|
||||
private final Map<String, Timer.Context> contexts = new HashMap<>();
|
||||
|
||||
public static void addTimer(ActivityDef def, String name) {
|
||||
public static void addTimer(ActivityDef def, String name, int hdrdigits) {
|
||||
if (timers.containsKey("name")) {
|
||||
logger.warn("A timer named '" + name + "' was already defined and initialized.");
|
||||
}
|
||||
Timer timer = ActivityMetrics.timer(def, name);
|
||||
Timer timer = ActivityMetrics.timer(def, name, hdrdigits);
|
||||
timers.put(name, timer);
|
||||
}
|
||||
|
||||
|
@ -50,7 +50,7 @@ public class DiagActivity extends SimpleActivity implements Activity, ActivityDe
|
||||
@Override
|
||||
public void initActivity() {
|
||||
super.initActivity();
|
||||
delayHistogram = ActivityMetrics.histogram(activityDef, "diagdelay");
|
||||
delayHistogram = ActivityMetrics.histogram(activityDef, "diagdelay", this.getHdrDigits());
|
||||
Integer initdelay = activityDef.getParams().getOptionalInteger("initdelay").orElse(0);
|
||||
try {
|
||||
Thread.sleep(initdelay);
|
||||
|
@ -67,14 +67,14 @@ public class HttpActivity extends SimpleActivity implements Activity, ActivityDe
|
||||
public void initActivity() {
|
||||
super.initActivity();
|
||||
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute");
|
||||
resultTimer = ActivityMetrics.timer(activityDef, "result");
|
||||
triesHisto = ActivityMetrics.histogram(activityDef, "tries");
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind",this.getHdrDigits());
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute", this.getHdrDigits());
|
||||
resultTimer = ActivityMetrics.timer(activityDef, "result", this.getHdrDigits());
|
||||
triesHisto = ActivityMetrics.histogram(activityDef, "tries", this.getHdrDigits());
|
||||
rowCounter = ActivityMetrics.meter(activityDef, "rows");
|
||||
statusCodeHisto = ActivityMetrics.histogram(activityDef, "statuscode");
|
||||
skippedTokens = ActivityMetrics.histogram(activityDef, "skipped-tokens");
|
||||
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success");
|
||||
statusCodeHisto = ActivityMetrics.histogram(activityDef, "statuscode",this.getHdrDigits());
|
||||
skippedTokens = ActivityMetrics.histogram(activityDef, "skipped-tokens",this.getHdrDigits());
|
||||
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success", this.getHdrDigits());
|
||||
this.sequencer = createOpSequence(ReadyHttpOp::new, false);
|
||||
setDefaultsFromOpSequence(sequencer);
|
||||
onActivityDefUpdate(activityDef);
|
||||
|
@ -83,10 +83,10 @@ public abstract class JDBCActivity extends SimpleActivity {
|
||||
@Override
|
||||
public void initActivity() {
|
||||
LOGGER.debug("initializing activity: " + getActivityDef().getAlias());
|
||||
bindTimer = ActivityMetrics.timer(getActivityDef(), "bind");
|
||||
resultTimer = ActivityMetrics.timer(getActivityDef(), "result");
|
||||
resultSuccessTimer = ActivityMetrics.timer(getActivityDef(), "result-success");
|
||||
triesHisto = ActivityMetrics.histogram(getActivityDef(), "tries");
|
||||
bindTimer = ActivityMetrics.timer(getActivityDef(), "bind", this.getHdrDigits());
|
||||
resultTimer = ActivityMetrics.timer(getActivityDef(), "result", this.getHdrDigits());
|
||||
resultSuccessTimer = ActivityMetrics.timer(getActivityDef(), "result-success", this.getHdrDigits());
|
||||
triesHisto = ActivityMetrics.histogram(getActivityDef(), "tries", this.getHdrDigits());
|
||||
|
||||
opSequence = createOpSequence(ReadyJDBCOp::new,false);
|
||||
setDefaultsFromOpSequence(opSequence);
|
||||
|
@ -108,10 +108,10 @@ public class JmsActivity extends SimpleActivity {
|
||||
factory = new PulsarConnectionFactory(jmsConnInfo.getJmsConnConfig());
|
||||
this.jmsContext = factory.createContext();
|
||||
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute");
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind", this.getHdrDigits());
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute", this.getHdrDigits());
|
||||
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
|
||||
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize");
|
||||
messagesizeHistogram = ActivityMetrics.histogram(activityDef, "messagesize", this.getHdrDigits());
|
||||
|
||||
if (StringUtils.equalsIgnoreCase(jmsProviderType, JmsUtil.JMS_PROVIDER_TYPES.PULSAR.label )) {
|
||||
this.sequence = createOpSequence((ot) -> new ReadyPulsarJmsOp(ot, this), false);
|
||||
|
@ -75,8 +75,8 @@ public class KafkaProducerActivity extends SimpleActivity {
|
||||
opSequence = initOpSequencer();
|
||||
setDefaultsFromOpSequence(opSequence);
|
||||
|
||||
resultTimer = ActivityMetrics.timer(activityDef, "result");
|
||||
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success");
|
||||
resultTimer = ActivityMetrics.timer(activityDef, "result", this.getHdrDigits());
|
||||
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success", this.getHdrDigits());
|
||||
}
|
||||
|
||||
private OpSequence<KafkaStatement> initOpSequencer() {
|
||||
|
@ -98,11 +98,11 @@ public class MongoActivity extends SimpleActivity implements ActivityDefObserver
|
||||
mongoDatabase = client.getDatabase(databaseName);
|
||||
showQuery = activityDef.getParams().getOptionalBoolean("showquery")
|
||||
.orElse(false);
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||
resultTimer = ActivityMetrics.timer(activityDef, "result");
|
||||
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success");
|
||||
resultSetSizeHisto = ActivityMetrics.histogram(activityDef, "resultset-size");
|
||||
triesHisto = ActivityMetrics.histogram(activityDef, "tries");
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind", this.getHdrDigits());
|
||||
resultTimer = ActivityMetrics.timer(activityDef, "result", this.getHdrDigits());
|
||||
resultSuccessTimer = ActivityMetrics.timer(activityDef, "result-success", this.getHdrDigits());
|
||||
resultSetSizeHisto = ActivityMetrics.histogram(activityDef, "resultset-size", this.getHdrDigits());
|
||||
triesHisto = ActivityMetrics.histogram(activityDef, "tries", this.getHdrDigits());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -110,14 +110,14 @@ public class PulsarActivity extends SimpleActivity implements ActivityDefObserve
|
||||
pulsarCache = new PulsarSpaceCache(this);
|
||||
|
||||
bytesCounter = ActivityMetrics.counter(activityDef, "bytes");
|
||||
messageSizeHistogram = ActivityMetrics.histogram(activityDef, "message_size");
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind");
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute");
|
||||
createTransactionTimer = ActivityMetrics.timer(activityDef, "create_transaction");
|
||||
commitTransactionTimer = ActivityMetrics.timer(activityDef, "commit_transaction");
|
||||
messageSizeHistogram = ActivityMetrics.histogram(activityDef, "message_size", this.getHdrDigits());
|
||||
bindTimer = ActivityMetrics.timer(activityDef, "bind", this.getHdrDigits());
|
||||
executeTimer = ActivityMetrics.timer(activityDef, "execute", this.getHdrDigits());
|
||||
createTransactionTimer = ActivityMetrics.timer(activityDef, "create_transaction", this.getHdrDigits());
|
||||
commitTransactionTimer = ActivityMetrics.timer(activityDef, "commit_transaction", this.getHdrDigits());
|
||||
|
||||
e2eMsgProcLatencyHistogram = ActivityMetrics.histogram(activityDef, "e2e_msg_latency");
|
||||
payloadRttHistogram = ActivityMetrics.histogram(activityDef, "payload_rtt");
|
||||
e2eMsgProcLatencyHistogram = ActivityMetrics.histogram(activityDef, "e2e_msg_latency", this.getHdrDigits());
|
||||
payloadRttHistogram = ActivityMetrics.histogram(activityDef, "payload_rtt", this.getHdrDigits());
|
||||
|
||||
msgErrOutOfSeqCounter = ActivityMetrics.counter(activityDef, "err_msg_oos");
|
||||
msgErrLossCounter = ActivityMetrics.counter(activityDef, "err_msg_loss");
|
||||
|
@ -17,6 +17,7 @@
|
||||
package io.nosqlbench.engine.api.activityapi.core;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
import io.nosqlbench.engine.api.activityapi.cyclelog.filters.IntPredicateDispenser;
|
||||
import io.nosqlbench.engine.api.activityapi.errorhandling.ErrorMetrics;
|
||||
import io.nosqlbench.engine.api.activityapi.input.InputDispenser;
|
||||
@ -37,7 +38,7 @@ import java.util.function.Supplier;
|
||||
* Provides the components needed to build and run an activity a runtime.
|
||||
* The easiest way to build a useful Activity is to extend {@link SimpleActivity}.
|
||||
*/
|
||||
public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable {
|
||||
public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable, NBNamedElement {
|
||||
|
||||
/**
|
||||
* Provide the activity with the controls needed to stop itself.
|
||||
@ -211,4 +212,8 @@ public interface Activity extends Comparable<Activity>, ActivityDefObserver, Pro
|
||||
}
|
||||
|
||||
int getMaxTries();
|
||||
|
||||
default int getHdrDigits() {
|
||||
return getParams().getOptionalInteger("hdr_digits").orElse(4);
|
||||
}
|
||||
}
|
||||
|
@ -49,13 +49,13 @@ public class CoreActivityInstrumentation implements ActivityInstrumentation {
|
||||
@Override
|
||||
public synchronized Timer getOrCreateInputTimer() {
|
||||
String metricName = "read_input";
|
||||
return ActivityMetrics.timer(def, metricName);
|
||||
return ActivityMetrics.timer(def, metricName, activity.getHdrDigits());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized Timer getOrCreateStridesServiceTimer() {
|
||||
return ActivityMetrics.timer(def, "strides" + SERVICE_TIME);
|
||||
return ActivityMetrics.timer(def, "strides" + SERVICE_TIME, activity.getHdrDigits());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -63,13 +63,13 @@ public class CoreActivityInstrumentation implements ActivityInstrumentation {
|
||||
if (activity.getStrideLimiter()==null) {
|
||||
return null;
|
||||
}
|
||||
return ActivityMetrics.timer(def, "strides" + RESPONSE_TIME);
|
||||
return ActivityMetrics.timer(def, "strides" + RESPONSE_TIME, activity.getHdrDigits());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized Timer getOrCreateCyclesServiceTimer() {
|
||||
return ActivityMetrics.timer(def, "cycles" + svcTimeSuffix);
|
||||
return ActivityMetrics.timer(def, "cycles" + svcTimeSuffix, activity.getHdrDigits());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -78,20 +78,20 @@ public class CoreActivityInstrumentation implements ActivityInstrumentation {
|
||||
return null;
|
||||
}
|
||||
String metricName = "cycles" + RESPONSE_TIME;
|
||||
return ActivityMetrics.timer(def, metricName);
|
||||
return ActivityMetrics.timer(def, metricName, activity.getHdrDigits());
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized Timer getOrCreatePhasesServiceTimer() {
|
||||
return ActivityMetrics.timer(def, "phases" + SERVICE_TIME);
|
||||
return ActivityMetrics.timer(def, "phases" + SERVICE_TIME, activity.getHdrDigits());
|
||||
}
|
||||
@Override
|
||||
public synchronized Timer getPhasesResponseTimerOrNull() {
|
||||
if (activity.getPhaseLimiter()==null) {
|
||||
return null;
|
||||
}
|
||||
return ActivityMetrics.timer(def,"phases" + RESPONSE_TIME);
|
||||
return ActivityMetrics.timer(def,"phases" + RESPONSE_TIME, activity.getHdrDigits());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -108,26 +108,26 @@ public class CoreActivityInstrumentation implements ActivityInstrumentation {
|
||||
|
||||
@Override
|
||||
public synchronized Timer getOrCreateBindTimer() {
|
||||
return ActivityMetrics.timer(def, "bind");
|
||||
return ActivityMetrics.timer(def, "bind", activity.getHdrDigits());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Timer getOrCreateExecuteTimer() {
|
||||
return ActivityMetrics.timer(def,"execute");
|
||||
return ActivityMetrics.timer(def,"execute", activity.getHdrDigits());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Timer getOrCreateResultTimer() {
|
||||
return ActivityMetrics.timer(def,"result");
|
||||
return ActivityMetrics.timer(def,"result", activity.getHdrDigits());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Timer getOrCreateResultSuccessTimer() {
|
||||
return ActivityMetrics.timer(def,"result-success");
|
||||
return ActivityMetrics.timer(def,"result-success", activity.getHdrDigits());
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized Histogram getOrCreateTriesHistogram() {
|
||||
return ActivityMetrics.histogram(def,"tries");
|
||||
return ActivityMetrics.histogram(def,"tries", activity.getHdrDigits());
|
||||
}
|
||||
}
|
||||
|
@ -17,8 +17,8 @@
|
||||
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
||||
|
||||
import com.codahale.metrics.Gauge;
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
import io.nosqlbench.engine.api.activityapi.core.Startable;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -80,6 +80,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
public class HybridRateLimiter implements Startable, RateLimiter {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(HybridRateLimiter.class);
|
||||
private NBNamedElement named;
|
||||
|
||||
//private volatile TokenFiller filler;
|
||||
private volatile long starttime;
|
||||
@ -88,7 +89,6 @@ public class HybridRateLimiter implements Startable, RateLimiter {
|
||||
private RateSpec rateSpec;
|
||||
|
||||
// basic state
|
||||
private ActivityDef activityDef;
|
||||
private String label;
|
||||
private State state = State.Idle;
|
||||
// metrics
|
||||
@ -104,10 +104,10 @@ public class HybridRateLimiter implements Startable, RateLimiter {
|
||||
protected HybridRateLimiter() {
|
||||
}
|
||||
|
||||
public HybridRateLimiter(ActivityDef def, String label, RateSpec rateSpec) {
|
||||
setActivityDef(def);
|
||||
public HybridRateLimiter(NBNamedElement named, String label, RateSpec rateSpec) {
|
||||
setLabel(label);
|
||||
init(activityDef);
|
||||
init(named);
|
||||
this.named = named;
|
||||
this.applyRateSpec(rateSpec);
|
||||
}
|
||||
|
||||
@ -115,10 +115,6 @@ public class HybridRateLimiter implements Startable, RateLimiter {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
protected void setActivityDef(ActivityDef def) {
|
||||
this.activityDef = def;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long maybeWaitForOp() {
|
||||
return tokens.blockAndTake();
|
||||
@ -151,7 +147,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
|
||||
}
|
||||
|
||||
this.rateSpec = updatingRateSpec;
|
||||
this.tokens = (this.tokens == null) ? new ThreadDrivenTokenPool(rateSpec, activityDef) : this.tokens.apply(rateSpec);
|
||||
this.tokens = (this.tokens == null) ? new ThreadDrivenTokenPool(rateSpec, named) : this.tokens.apply(named, rateSpec);
|
||||
// this.filler = (this.filler == null) ? new TokenFiller(rateSpec, activityDef) : filler.apply(rateSpec);
|
||||
// this.tokens = this.filler.getTokenPool();
|
||||
|
||||
@ -163,7 +159,7 @@ public class HybridRateLimiter implements Startable, RateLimiter {
|
||||
}
|
||||
|
||||
|
||||
protected void init(ActivityDef activityDef) {
|
||||
protected void init(NBNamedElement activityDef) {
|
||||
this.delayGauge = ActivityMetrics.gauge(activityDef, label + ".waittime", new RateLimiters.WaitTimeGauge(this));
|
||||
this.avgRateGauge = ActivityMetrics.gauge(activityDef, label + ".config.cyclerate", new RateLimiters.RateGauge(this));
|
||||
this.burstRateGauge = ActivityMetrics.gauge(activityDef, label + ".config.burstrate", new RateLimiters.BurstRateGauge(this));
|
||||
|
@ -107,7 +107,7 @@ public class InlineTokenPool {
|
||||
ByteBuffer logbuf = getBuffer();
|
||||
apply(rateSpec);
|
||||
logger.debug("initialized token pool: " + this + " for rate:" + rateSpec);
|
||||
this.refillTimer = ActivityMetrics.timer(def, "tokenfiller");
|
||||
this.refillTimer = ActivityMetrics.timer(def, "tokenfiller",4);
|
||||
}
|
||||
|
||||
public InlineTokenPool(long poolsize, double burstRatio, ActivityDef def) {
|
||||
@ -116,7 +116,7 @@ public class InlineTokenPool {
|
||||
this.burstRatio = burstRatio;
|
||||
this.maxActiveAndBurstSize = (long) (maxActivePoolSize * burstRatio);
|
||||
this.maxBurstPoolSize = maxActiveAndBurstSize - maxActivePoolSize;
|
||||
this.refillTimer = ActivityMetrics.timer(def, "tokenfiller");
|
||||
this.refillTimer = ActivityMetrics.timer(def, "tokenfiller",4);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -17,14 +17,14 @@
|
||||
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
||||
|
||||
import com.codahale.metrics.Gauge;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
public class RateLimiters {
|
||||
private final static Logger logger = LogManager.getLogger(RateLimiters.class);
|
||||
|
||||
public static synchronized RateLimiter createOrUpdate(ActivityDef def, String label, RateLimiter extant, RateSpec spec) {
|
||||
public static synchronized RateLimiter createOrUpdate(NBNamedElement def, String label, RateLimiter extant, RateSpec spec) {
|
||||
|
||||
if (extant == null) {
|
||||
RateLimiter rateLimiter= new HybridRateLimiter(def, label, spec);
|
||||
@ -38,7 +38,7 @@ public class RateLimiters {
|
||||
}
|
||||
}
|
||||
|
||||
public static synchronized RateLimiter create(ActivityDef def, String label, String specString) {
|
||||
public static synchronized RateLimiter create(NBNamedElement def, String label, String specString) {
|
||||
return createOrUpdate(def, label, null, new RateSpec(specString));
|
||||
}
|
||||
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
import io.nosqlbench.nb.annotations.Service;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -59,12 +59,9 @@ public class ThreadDrivenTokenPool implements TokenPool {
|
||||
private volatile long waitingPool;
|
||||
private RateSpec rateSpec;
|
||||
private long nanosPerOp;
|
||||
// private long debugTrigger=0L;
|
||||
// private long debugRate=1000000000;
|
||||
private long blocks = 0L;
|
||||
|
||||
private TokenFiller filler;
|
||||
private final ActivityDef activityDef;
|
||||
|
||||
/**
|
||||
* This constructor tries to pick reasonable defaults for the token pool for
|
||||
@ -73,9 +70,8 @@ public class ThreadDrivenTokenPool implements TokenPool {
|
||||
*
|
||||
* @param rateSpec a {@link RateSpec}
|
||||
*/
|
||||
public ThreadDrivenTokenPool(RateSpec rateSpec, ActivityDef activityDef) {
|
||||
this.activityDef = activityDef;
|
||||
apply(rateSpec);
|
||||
public ThreadDrivenTokenPool(RateSpec rateSpec, NBNamedElement named) {
|
||||
apply(named,rateSpec);
|
||||
logger.debug("initialized token pool: " + this + " for rate:" + rateSpec);
|
||||
// filler.start();
|
||||
}
|
||||
@ -87,7 +83,7 @@ public class ThreadDrivenTokenPool implements TokenPool {
|
||||
* @param rateSpec The rate specifier.
|
||||
*/
|
||||
@Override
|
||||
public synchronized TokenPool apply(RateSpec rateSpec) {
|
||||
public synchronized TokenPool apply(NBNamedElement named, RateSpec rateSpec) {
|
||||
this.rateSpec = rateSpec;
|
||||
this.maxActivePool = Math.max((long) 1E6, (long) ((double) rateSpec.getNanosPerOp() * MIN_CONCURRENT_OPS));
|
||||
this.maxOverActivePool = (long) (maxActivePool * rateSpec.getBurstRatio());
|
||||
@ -95,7 +91,7 @@ public class ThreadDrivenTokenPool implements TokenPool {
|
||||
|
||||
this.burstPoolSize = maxOverActivePool - maxActivePool;
|
||||
this.nanosPerOp = rateSpec.getNanosPerOp();
|
||||
this.filler = (this.filler == null) ? new TokenFiller(rateSpec, this, activityDef) : filler.apply(rateSpec);
|
||||
this.filler = (this.filler == null) ? new TokenFiller(rateSpec, this, named, 3) : filler.apply(rateSpec);
|
||||
notifyAll();
|
||||
return this;
|
||||
}
|
||||
|
@ -17,7 +17,7 @@
|
||||
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
||||
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
import io.nosqlbench.engine.api.metrics.ActivityMetrics;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -46,13 +46,11 @@ public class TokenFiller implements Runnable {
|
||||
* By default, this rate is at least every millisecond +- scheduling jitter
|
||||
* in the JVM.
|
||||
*
|
||||
* @param rateSpec A {@link RateSpec}
|
||||
* @param def An {@link ActivityDef}
|
||||
*/
|
||||
public TokenFiller(RateSpec rateSpec, ThreadDrivenTokenPool tokenPool, ActivityDef def) {
|
||||
public TokenFiller(RateSpec rateSpec, ThreadDrivenTokenPool tokenPool, NBNamedElement named, int hdrdigits) {
|
||||
this.rateSpec = rateSpec;
|
||||
this.tokenPool = tokenPool;
|
||||
this.timer = ActivityMetrics.timer(def, "tokenfiller");
|
||||
this.timer = ActivityMetrics.timer(named, "tokenfiller", hdrdigits);
|
||||
}
|
||||
|
||||
public TokenFiller apply(RateSpec rateSpec) {
|
||||
|
@ -16,8 +16,11 @@
|
||||
|
||||
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
||||
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
|
||||
public interface TokenPool {
|
||||
TokenPool apply(RateSpec rateSpec);
|
||||
|
||||
TokenPool apply(NBNamedElement named, RateSpec rateSpec);
|
||||
|
||||
double getBurstRatio();
|
||||
|
||||
|
@ -42,7 +42,7 @@ public class ExceptionHistoMetrics {
|
||||
synchronized (histos) {
|
||||
h = histos.computeIfAbsent(
|
||||
name,
|
||||
k -> ActivityMetrics.histogram(activityDef, "errorhistos." + name)
|
||||
k -> ActivityMetrics.histogram(activityDef, "errorhistos." + name, activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -41,7 +41,7 @@ public class ExceptionTimerMetrics {
|
||||
synchronized (timers) {
|
||||
timer = timers.computeIfAbsent(
|
||||
name,
|
||||
k -> ActivityMetrics.timer(activityDef, "exceptions." + name)
|
||||
k -> ActivityMetrics.timer(activityDef, "exceptions." + name, activityDef.getParams().getOptionalInteger("hdr_digits").orElse(4))
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -16,7 +16,7 @@
|
||||
|
||||
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
||||
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
@ -24,8 +24,8 @@ public class TestableHybridRateLimiter extends HybridRateLimiter {
|
||||
|
||||
private final AtomicLong clock;
|
||||
|
||||
public TestableHybridRateLimiter(AtomicLong clock, RateSpec rateSpec, ActivityDef def) {
|
||||
setActivityDef(def);
|
||||
public TestableHybridRateLimiter(AtomicLong clock, RateSpec rateSpec, NBNamedElement def) {
|
||||
super(def, "test", rateSpec);
|
||||
applyRateSpec(rateSpec);
|
||||
setLabel("test");
|
||||
this.clock = clock;
|
||||
|
@ -16,6 +16,7 @@
|
||||
|
||||
package io.nosqlbench.engine.api.activityapi.ratelimits;
|
||||
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@ -59,7 +60,12 @@ public class TokenPoolTest {
|
||||
assertThat(p.getWaitTime()).isEqualTo(10000000L);
|
||||
|
||||
RateSpec s2 = new RateSpec(1000000L, 1.10D);
|
||||
p.apply(s2);
|
||||
p.apply(new NBNamedElement() {
|
||||
@Override
|
||||
public String getName() {
|
||||
return "test";
|
||||
}
|
||||
},s2);
|
||||
|
||||
|
||||
}
|
||||
|
@ -14,5 +14,8 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
var sum= adder.getSum(12,34);
|
||||
print('sum is ' + sum);
|
||||
package io.nosqlbench.api;
|
||||
|
||||
public interface NBNamedElement {
|
||||
String getName();
|
||||
}
|
@ -16,6 +16,7 @@
|
||||
|
||||
package io.nosqlbench.engine.api.activityimpl;
|
||||
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
import io.nosqlbench.engine.api.util.Unit;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -34,7 +35,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
* Essentially, ActivityDef is just a type-aware wrapper around a thread-safe parameter map,
|
||||
* with an atomic change counter which can be used to signal changes to observers.</p>
|
||||
*/
|
||||
public class ActivityDef {
|
||||
public class ActivityDef implements NBNamedElement {
|
||||
|
||||
// milliseconds between cycles per thread, for slow tests only
|
||||
public static final String DEFAULT_ALIAS = "UNNAMEDACTIVITY";
|
||||
@ -208,4 +209,8 @@ public class ActivityDef {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return getAlias();
|
||||
}
|
||||
}
|
||||
|
@ -17,11 +17,11 @@
|
||||
package io.nosqlbench.engine.api.metrics;
|
||||
|
||||
import com.codahale.metrics.*;
|
||||
import io.nosqlbench.api.NBNamedElement;
|
||||
import io.nosqlbench.engine.api.activityapi.core.MetricRegistryService;
|
||||
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.engine.api.util.Unit;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.script.ScriptContext;
|
||||
import java.io.File;
|
||||
@ -62,17 +62,17 @@ public class ActivityMetrics {
|
||||
/**
|
||||
* Register a named metric for an activity, synchronized on the activity
|
||||
*
|
||||
* @param activityDef The activity def that the metric will be for
|
||||
* @param named The activity def that the metric will be for
|
||||
* @param name The full metric name
|
||||
* @param metricProvider A function to actually create the metric if needed
|
||||
* @return a Metric, or null if the metric for the name was already present
|
||||
*/
|
||||
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
|
||||
private static Metric register(ActivityDef activityDef, String name, MetricProvider metricProvider) {
|
||||
String fullMetricName = activityDef.getAlias() + "." + name;
|
||||
private static Metric register(NBNamedElement named, String name, MetricProvider metricProvider) {
|
||||
String fullMetricName = named.getName() + "." + name;
|
||||
Metric metric = get().getMetrics().get(fullMetricName);
|
||||
if (metric == null) {
|
||||
synchronized (activityDef) {
|
||||
synchronized (named) {
|
||||
metric = get().getMetrics().get(fullMetricName);
|
||||
if (metric == null) {
|
||||
metric = metricProvider.getMetric();
|
||||
@ -109,17 +109,17 @@ public class ActivityMetrics {
|
||||
* <p>This method ensures that if multiple threads attempt to create the same-named metric on a given activity,
|
||||
* that only one of them succeeds.</p>
|
||||
*
|
||||
* @param activityDef an associated activity def
|
||||
* @param named an associated activity def
|
||||
* @param name a simple, descriptive name for the timer
|
||||
* @return the timer, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Timer timer(ActivityDef activityDef, String name) {
|
||||
String fullMetricName = activityDef.getAlias() + "." + name;
|
||||
Timer registeredTimer = (Timer) register(activityDef, name, () ->
|
||||
public static Timer timer(NBNamedElement named, String name, int hdrdigits) {
|
||||
String fullMetricName = named.getName() + "." + name;
|
||||
Timer registeredTimer = (Timer) register(named, name, () ->
|
||||
new NicerTimer(fullMetricName,
|
||||
new DeltaHdrHistogramReservoir(
|
||||
fullMetricName,
|
||||
activityDef.getParams().getOptionalInteger(HDRDIGITS_PARAM).orElse(_HDRDIGITS)
|
||||
hdrdigits
|
||||
)
|
||||
));
|
||||
return registeredTimer;
|
||||
@ -145,18 +145,18 @@ public class ActivityMetrics {
|
||||
* <p>This method ensures that if multiple threads attempt to create the same-named metric on a given activity,
|
||||
* that only one of them succeeds.</p>
|
||||
*
|
||||
* @param activityDef an associated activity def
|
||||
* @param named an associated activity def
|
||||
* @param name a simple, descriptive name for the histogram
|
||||
* @return the histogram, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Histogram histogram(ActivityDef activityDef, String name) {
|
||||
String fullMetricName = activityDef.getAlias() + "." + name;
|
||||
return (Histogram) register(activityDef, name, () ->
|
||||
public static Histogram histogram(NBNamedElement named, String name, int hdrdigits) {
|
||||
String fullMetricName = named.getName() + "." + name;
|
||||
return (Histogram) register(named, name, () ->
|
||||
new NicerHistogram(
|
||||
fullMetricName,
|
||||
new DeltaHdrHistogramReservoir(
|
||||
fullMetricName,
|
||||
activityDef.getParams().getOptionalInteger(HDRDIGITS_PARAM).orElse(_HDRDIGITS)
|
||||
hdrdigits
|
||||
)
|
||||
));
|
||||
}
|
||||
@ -177,12 +177,12 @@ public class ActivityMetrics {
|
||||
* <p>This method ensures that if multiple threads attempt to create the same-named metric on a given activity,
|
||||
* that only one of them succeeds.</p>
|
||||
*
|
||||
* @param activityDef an associated activity def
|
||||
* @param named an associated activity def
|
||||
* @param name a simple, descriptive name for the counter
|
||||
* @return the counter, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Counter counter(ActivityDef activityDef, String name) {
|
||||
return (Counter) register(activityDef, name, Counter::new);
|
||||
public static Counter counter(NBNamedElement named, String name) {
|
||||
return (Counter) register(named, name, Counter::new);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -190,12 +190,12 @@ public class ActivityMetrics {
|
||||
* <p>This method ensures that if multiple threads attempt to create the same-named metric on a given activity,
|
||||
* that only one of them succeeds.</p>
|
||||
*
|
||||
* @param activityDef an associated activity def
|
||||
* @param named an associated activity def
|
||||
* @param name a simple, descriptive name for the meter
|
||||
* @return the meter, perhaps a different one if it has already been registered
|
||||
*/
|
||||
public static Meter meter(ActivityDef activityDef, String name) {
|
||||
return (Meter) register(activityDef, name, Meter::new);
|
||||
public static Meter meter(NBNamedElement named, String name) {
|
||||
return (Meter) register(named, name, Meter::new);
|
||||
}
|
||||
|
||||
private static MetricRegistry get() {
|
||||
@ -211,8 +211,8 @@ public class ActivityMetrics {
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public static <T> Gauge<T> gauge(ActivityDef activityDef, String name, Gauge<T> gauge) {
|
||||
return (Gauge<T>) register(activityDef, name, () -> gauge);
|
||||
public static <T> Gauge<T> gauge(NBNamedElement named, String name, Gauge<T> gauge) {
|
||||
return (Gauge<T>) register(named, name, () -> gauge);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@ -350,8 +350,8 @@ public class ActivityMetrics {
|
||||
new MetricsRegistryMount(getMetricRegistry(), subRegistry, mountPrefix);
|
||||
}
|
||||
|
||||
public static void removeActivityMetrics(ActivityDef activityDef) {
|
||||
get().getMetrics().keySet().stream().filter(s -> s.startsWith(activityDef.getAlias() + "."))
|
||||
public static void removeActivityMetrics(NBNamedElement named) {
|
||||
get().getMetrics().keySet().stream().filter(s -> s.startsWith(named.getName() + "."))
|
||||
.forEach(get()::remove);
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user