mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2024-12-22 23:23:56 -06:00
partial working optimo, something is still broken in the search behavior
This commit is contained in:
parent
6b85515715
commit
e64de7476e
@ -107,7 +107,8 @@ public class DiagDriverAdapter extends BaseDriverAdapter<DiagOp, DiagSpace> impl
|
||||
|
||||
@Override
|
||||
public List<OpTemplate> getSyntheticOpTemplates(OpsDocList opsDocList, Map<String, Object> params) {
|
||||
return OpsLoader.loadString("log:level=INFO", OpTemplateFormat.inline, params,null).getOps();
|
||||
return OpsLoader.loadString("noop: noop", OpTemplateFormat.inline, params,null).getOps();
|
||||
// return OpsLoader.loadString("log:level=INFO", OpTemplateFormat.inline, params,null).getOps();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -40,9 +40,5 @@ public enum RateLimiters {
|
||||
return extant;
|
||||
}
|
||||
|
||||
public static synchronized RateLimiter create(final NBComponent def, final String label, final String specString) {
|
||||
return RateLimiters.createOrUpdate(def, null, new SimRateSpec(specString));
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.engine.api.activityapi.ratelimits.simrate;
|
||||
|
||||
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
|
||||
|
||||
public class CycleRateSpec extends SimRateSpec {
|
||||
public CycleRateSpec(double opsPerSec, double burstRatio) {
|
||||
super(opsPerSec, burstRatio);
|
||||
}
|
||||
|
||||
public CycleRateSpec(double opsPerSec, double burstRatio, Verb type) {
|
||||
super(opsPerSec, burstRatio, type);
|
||||
}
|
||||
|
||||
public CycleRateSpec(ParameterMap.NamedParameter tuple) {
|
||||
super(tuple);
|
||||
}
|
||||
|
||||
public CycleRateSpec(String spec) {
|
||||
super(spec);
|
||||
}
|
||||
}
|
@ -24,6 +24,7 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.temporal.ChronoUnit;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.locks.LockSupport;
|
||||
@ -71,6 +72,7 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
|
||||
private long blocks;
|
||||
|
||||
private final ReentrantLock fillerLock = new ReentrantLock(false);
|
||||
|
||||
private AtomicLong cumulativeWaitTimeTicks = new AtomicLong(0L);
|
||||
private long startTime;
|
||||
|
||||
@ -80,43 +82,6 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
|
||||
startFiller();
|
||||
}
|
||||
|
||||
private void startFiller() {
|
||||
try {
|
||||
applySpec(spec);
|
||||
fillerLock.lock();
|
||||
if (this.filler != null) {
|
||||
logger.debug("filler already started, no changes");
|
||||
return;
|
||||
}
|
||||
this.filler = new Thread(new FillerRunnable());
|
||||
filler.setName("FILLER");
|
||||
filler.setUncaughtExceptionHandler(this);
|
||||
filler.start();
|
||||
} finally {
|
||||
fillerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void stopFiller() {
|
||||
try {
|
||||
fillerLock.lock();
|
||||
if (filler == null) {
|
||||
logger.debug("filler already stopped, no changes");
|
||||
return;
|
||||
}
|
||||
running = false;
|
||||
logger.trace("STARTED awaiting filler thread join");
|
||||
filler.join();
|
||||
logger.trace("FINISHED awaiting filler thread join");
|
||||
filler = null;
|
||||
accumulateStats();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
fillerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public long refill() {
|
||||
try {
|
||||
|
||||
@ -131,7 +96,7 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
|
||||
if (intOverFlowNanoTokens > 0) {
|
||||
waitingPool.addAndGet(spec.nanosToTicks(intOverFlowNanoTokens));
|
||||
newNanoTokens -= intOverFlowNanoTokens;
|
||||
logger.warn(() -> "timer overflow with extra tokens=" + intOverFlowNanoTokens);
|
||||
// logger.warn(() -> "timer overflow with extra tokens=" + intOverFlowNanoTokens);
|
||||
}
|
||||
int newTokens = spec.nanosToTicks(newNanoTokens);
|
||||
|
||||
@ -163,7 +128,6 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
|
||||
|
||||
int burstFillAllowed = (int) (refillFactor * this.burstPoolSize);
|
||||
|
||||
|
||||
burstFillAllowed = Math.min(this.maxOverActivePool - this.activePool.availablePermits(), burstFillAllowed);
|
||||
|
||||
// we can only burst up to our burst limit, but only as much time as we have in the waiting pool already
|
||||
@ -182,27 +146,39 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
|
||||
// }
|
||||
// System.out.println();
|
||||
|
||||
long waiting = this.activePool.availablePermits() + this.waitingPool.get();
|
||||
return waiting;
|
||||
// long waiting = this.activePool.availablePermits() + this.waitingPool.get();
|
||||
// return waiting;
|
||||
} catch (Exception e) {
|
||||
logger.error(e);
|
||||
} finally {
|
||||
fillerLock.unlock();
|
||||
|
||||
long waiting = this.activePool.availablePermits() + this.waitingPool.get();
|
||||
return waiting;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void applyRateSpec(SimRateSpec updatingSimRateSpec) {
|
||||
logger.info("rate spec:\n" + updatingSimRateSpec);
|
||||
try {
|
||||
fillerLock.lock();
|
||||
|
||||
if (null == updatingSimRateSpec) throw new RuntimeException("RateSpec must be defined");
|
||||
|
||||
if (updatingSimRateSpec.verb == SimRateSpec.Verb.stop || updatingSimRateSpec.verb == SimRateSpec.Verb.restart) {
|
||||
if (filler != null) {
|
||||
stopFiller();
|
||||
}
|
||||
if (filler != null) {
|
||||
stopFiller();
|
||||
}
|
||||
applySpec(updatingSimRateSpec);
|
||||
this.spec = updatingSimRateSpec;
|
||||
// if (updatingSimRateSpec.verb == SimRateSpec.Verb.stop || updatingSimRateSpec.verb == SimRateSpec.Verb.restart) {
|
||||
// if (filler != null) {
|
||||
// stopFiller();
|
||||
// }
|
||||
// }
|
||||
|
||||
// convertTimeBase(spec, updatingSimRateSpec);
|
||||
initPools(spec);
|
||||
|
||||
if (updatingSimRateSpec.verb == SimRateSpec.Verb.start || updatingSimRateSpec.verb == SimRateSpec.Verb.restart) {
|
||||
if (filler == null) {
|
||||
startFiller();
|
||||
@ -215,28 +191,58 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* When a rate limiter is stopped in the midst of a reconfiguration, carry over the accumulated time
|
||||
* in the active pool after converting the time base. Extra time that won't fit because of any time-base
|
||||
* scaling is sent into the waiting pool automatically.
|
||||
*/
|
||||
private void convertTimeBase(SimRateSpec from, SimRateSpec to) {
|
||||
ChronoUnit fromUnit = from.unit;
|
||||
ChronoUnit toUnit = to.unit;
|
||||
|
||||
if (fromUnit == toUnit) {
|
||||
return;
|
||||
}
|
||||
|
||||
int drained = activePool.drainPermits();
|
||||
Duration drainedTime = Duration.of(drained, fromUnit);
|
||||
long totalNanos = (drainedTime.getSeconds() * 1_000_000_000) + drainedTime.getNano();
|
||||
|
||||
int newTicks = to.nanosToTicks(totalNanos);
|
||||
int ticksForActive = Math.min(newTicks, 1_000_000_000);
|
||||
long nanosForActive = totalNanos - to.ticksToNanos(ticksForActive);
|
||||
long nanosForWaiting = totalNanos - nanosForActive;
|
||||
|
||||
this.waitingPool.addAndGet(nanosForActive);
|
||||
this.activePool.release((int) nanosForWaiting);
|
||||
}
|
||||
|
||||
private void accumulateStats() {
|
||||
this.cumulativeWaitTimeTicks.addAndGet(this.waitingPool.get());
|
||||
}
|
||||
|
||||
@Override
|
||||
public Duration getWaitTimeDuration(){
|
||||
public Duration getWaitTimeDuration() {
|
||||
return Duration.of(waitingPool.get(), this.spec.unit);
|
||||
}
|
||||
|
||||
@Override
|
||||
public double getWaitTimeSeconds() {
|
||||
Duration wait = getWaitTimeDuration();
|
||||
return (double) wait.getSeconds() + (wait.getNano()/1_000_000_000d);
|
||||
Duration wait = getWaitTimeDuration();
|
||||
return (double) wait.getSeconds() + (wait.getNano() / 1_000_000_000d);
|
||||
}
|
||||
|
||||
public void applySpec(SimRateSpec simRateSpec) {
|
||||
ticksPerOp = simRateSpec.ticksPerOp();
|
||||
public void initPools(SimRateSpec simRateSpec) {
|
||||
maxActivePool = 1_000_000_000;
|
||||
maxOverActivePool = (int) (this.maxActivePool * simRateSpec.burstRatio());
|
||||
burstPoolSize = this.maxOverActivePool - this.maxActivePool;
|
||||
this.startTime = System.nanoTime();
|
||||
|
||||
this.activePool.drainPermits();
|
||||
ticksPerOp = simRateSpec.ticksPerOp();
|
||||
this.activePool.release(ticksPerOp); // Allow the first op to start immediately, but only the first
|
||||
this.waitingPool.set(0);
|
||||
|
||||
this.startTime = System.nanoTime();
|
||||
}
|
||||
|
||||
public long block() {
|
||||
@ -259,38 +265,68 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
|
||||
return spec;
|
||||
}
|
||||
|
||||
private void startFiller() {
|
||||
try {
|
||||
fillerLock.lock();
|
||||
initPools(spec);
|
||||
running = true;
|
||||
if (this.filler != null) {
|
||||
logger.debug("filler already started, no changes");
|
||||
return;
|
||||
}
|
||||
this.filler = new Thread(new FillerRunnable());
|
||||
filler.setName("FILLER");
|
||||
filler.setUncaughtExceptionHandler(this);
|
||||
filler.start();
|
||||
} finally {
|
||||
fillerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
private void stopFiller() {
|
||||
try {
|
||||
fillerLock.lock();
|
||||
if (filler == null) {
|
||||
logger.debug("filler already stopped, no changes");
|
||||
return;
|
||||
}
|
||||
running = false;
|
||||
filler.join();
|
||||
filler = null;
|
||||
accumulateStats();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
fillerLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private final class FillerRunnable implements Runnable {
|
||||
private long fills = 0L;
|
||||
private int charat = 0;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
System.out.print("_");
|
||||
while (SimRate.this.running) {
|
||||
fills++;
|
||||
// if ((fills%100)==0) {
|
||||
// System.out.print(chars[charat++%chars.length]);
|
||||
// System.out.print(ANSI_CURSOR_LEFT);
|
||||
// System.out.flush();
|
||||
// }
|
||||
// logger.debug(() -> "refilling");
|
||||
while (running) {
|
||||
SimRate.this.refill();
|
||||
fills++;
|
||||
LockSupport.parkNanos(refillIntervalNanos);
|
||||
}
|
||||
System.out.println("stopping filler");
|
||||
logger.debug("shutting down refill thread");
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return String.format(
|
||||
"{ active:%d, max:%d, fill:'(%,3.1f%%)A (%,3.1f%%)B', wait_ns:%,d, blocks:%,d lock:%s}",
|
||||
this.activePool.availablePermits(), this.maxActivePool,
|
||||
"{ rate:%f active:%d, max:%d, fill:'(%,3.1f%%)A (%,3.1f%%)B', wait_ns:%,d, blocks:%,d lock:%s ticks:%d}",
|
||||
this.spec.getRate(), this.activePool.availablePermits(), this.maxActivePool,
|
||||
(double) this.activePool.availablePermits() / this.maxActivePool * 100.0,
|
||||
(double) this.activePool.availablePermits() / this.maxOverActivePool * 100.0,
|
||||
this.waitingPool.get(),
|
||||
this.blocks,
|
||||
this.fillerLock.isLocked() ? "LOCKED" : "UNLOCKED"
|
||||
this.fillerLock.isLocked() ? "LOCKED" : "UNLOCKED", spec.ticksPerOp()
|
||||
);
|
||||
|
||||
}
|
||||
@ -318,7 +354,7 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
|
||||
@Override
|
||||
public Duration getTotalWaitTimeDuration() {
|
||||
Duration d1 = Duration.of(waitingPool.get(), this.spec.unit);
|
||||
Duration d2 = Duration.of(cumulativeWaitTimeTicks.get(),this.spec.unit);
|
||||
Duration d2 = Duration.of(cumulativeWaitTimeTicks.get(), this.spec.unit);
|
||||
return d1.plus(d2);
|
||||
}
|
||||
|
||||
|
@ -153,15 +153,25 @@ public class SimRateSpec {
|
||||
}
|
||||
|
||||
public int nanosToTicks(long newNanoTokens) {
|
||||
if (newNanoTokens>Integer.MAX_VALUE) {
|
||||
throw new RuntimeException("time base error with nanoseconds to ticks, value (" + newNanoTokens + ") is too large (>2^31!)");
|
||||
}
|
||||
// if (newNanoTokens>Integer.MAX_VALUE) {
|
||||
// throw new RuntimeException("time base error with nanoseconds to ticks, value (" + newNanoTokens + ") is too large (>2^31!)");
|
||||
// }
|
||||
return switch (unit) {
|
||||
case NANOS -> (int) newNanoTokens;
|
||||
case MICROS -> (int) (newNanoTokens/1_000L);
|
||||
case MILLIS -> (int) (newNanoTokens/1_000_000L);
|
||||
case SECONDS -> (int) (newNanoTokens/1_000_000_000L);
|
||||
default -> throw new RuntimeException("invalid ChronoUnit for rate spec:" + unit);
|
||||
default -> throw new RuntimeException("invalid ChronoUnit for nanosToTicks:" + unit);
|
||||
};
|
||||
}
|
||||
|
||||
public long ticksToNanos(int newTicks) {
|
||||
return switch (unit) {
|
||||
case NANOS -> newTicks;
|
||||
case MICROS -> newTicks*1_000L;
|
||||
case MILLIS -> newTicks*1_000_000L;
|
||||
case SECONDS -> newTicks*1_000_000_000L;
|
||||
default -> throw new RuntimeException("invalid ChronoUnit for ticksToNanos:" + unit);
|
||||
};
|
||||
}
|
||||
|
||||
@ -267,19 +277,6 @@ public class SimRateSpec {
|
||||
return String.format("{ rate:'%s', burstRatio:'%.3f', SOPSS:'%s', BOPSS:'%s', verb:'%s' }", ratefmt, burstRatio, ratefmt, burstfmt, verb);
|
||||
}
|
||||
|
||||
public SimRateSpec withOpsPerSecond(double rate) {
|
||||
return new SimRateSpec(rate, this.burstRatio);
|
||||
}
|
||||
|
||||
public SimRateSpec withBurstRatio(double burstRatio) {
|
||||
return new SimRateSpec(this.opsPerSec, burstRatio);
|
||||
}
|
||||
|
||||
public SimRateSpec withVerb(Verb verb) {
|
||||
return new SimRateSpec(this.opsPerSec, this.burstRatio, verb);
|
||||
}
|
||||
|
||||
|
||||
public double getRate() {
|
||||
return this.opsPerSec;
|
||||
}
|
||||
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.engine.api.activityapi.ratelimits.simrate;
|
||||
|
||||
import io.nosqlbench.api.engine.activityimpl.ParameterMap;
|
||||
|
||||
public class StrideRateSpec extends SimRateSpec {
|
||||
public StrideRateSpec(double opsPerSec, double burstRatio) {
|
||||
super(opsPerSec, burstRatio);
|
||||
}
|
||||
|
||||
public StrideRateSpec(double opsPerSec, double burstRatio, Verb type) {
|
||||
super(opsPerSec, burstRatio, type);
|
||||
}
|
||||
|
||||
public StrideRateSpec(ParameterMap.NamedParameter tuple) {
|
||||
super(tuple);
|
||||
}
|
||||
|
||||
public StrideRateSpec(String spec) {
|
||||
super(spec);
|
||||
}
|
||||
}
|
@ -18,6 +18,7 @@ package io.nosqlbench.engine.api.activityimpl;
|
||||
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
import io.nosqlbench.components.NBBaseComponent;
|
||||
import io.nosqlbench.components.events.ParamChange;
|
||||
import io.nosqlbench.engine.api.activityapi.core.*;
|
||||
import io.nosqlbench.engine.api.activityapi.core.progress.ActivityMetricProgressMeter;
|
||||
import io.nosqlbench.engine.api.activityapi.core.progress.ProgressMeterDisplay;
|
||||
@ -26,6 +27,7 @@ import io.nosqlbench.engine.api.activityapi.errorhandling.modular.NBErrorHandler
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.SequencerType;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.RateLimiters;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.CycleRateSpec;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
|
||||
@ -43,6 +45,7 @@ import io.nosqlbench.adapters.api.activityconfig.OpsLoader;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplate;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpTemplateFormat;
|
||||
import io.nosqlbench.adapters.api.activityconfig.yaml.OpsDocList;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.StrideRateSpec;
|
||||
import io.nosqlbench.engine.api.activityimpl.motor.RunStateTally;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DryRunOpDispenserWrapper;
|
||||
@ -113,7 +116,7 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
|
||||
|
||||
@Override
|
||||
public synchronized void initActivity() {
|
||||
initOrUpdateRateLimiters(this.activityDef);
|
||||
// initOrUpdateRateLimiters(this.activityDef);
|
||||
}
|
||||
|
||||
public synchronized NBErrorHandler getErrorHandler() {
|
||||
@ -299,21 +302,27 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
|
||||
|
||||
@Override
|
||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
initOrUpdateRateLimiters(activityDef);
|
||||
// initOrUpdateRateLimiters(activityDef);
|
||||
}
|
||||
|
||||
public synchronized void initOrUpdateRateLimiters(ActivityDef activityDef) {
|
||||
|
||||
|
||||
activityDef.getParams().getOptionalNamedParameter("striderate")
|
||||
.map(SimRateSpec::new)
|
||||
.ifPresent(spec -> strideLimiter = RateLimiters.createOrUpdate(this, strideLimiter, spec));
|
||||
.map(StrideRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
|
||||
|
||||
activityDef.getParams().getOptionalNamedParameter("cyclerate", "targetrate", "rate")
|
||||
.map(SimRateSpec::new).ifPresent(
|
||||
spec -> cycleLimiter = RateLimiters.createOrUpdate(this, cycleLimiter, spec));
|
||||
.map(CycleRateSpec::new).ifPresent(sr -> this.onEvent(new ParamChange<>(sr)));
|
||||
|
||||
}
|
||||
|
||||
public void createOrUpdateStrideLimiter(SimRateSpec spec) {
|
||||
strideLimiter = RateLimiters.createOrUpdate(this, strideLimiter, spec);
|
||||
}
|
||||
public void createOrUpdateCycleLimiter(SimRateSpec spec) {
|
||||
cycleLimiter = RateLimiters.createOrUpdate(this, cycleLimiter, spec);
|
||||
}
|
||||
|
||||
/**
|
||||
* Modify the provided ActivityDef with defaults for stride and cycles, if they haven't been provided, based on the
|
||||
* length of the sequence as determined by the provided ratios. Also, modify the ActivityDef with reasonable
|
||||
|
@ -34,7 +34,11 @@ import io.nosqlbench.api.engine.activityimpl.ActivityDef;
|
||||
import io.nosqlbench.api.errors.BasicError;
|
||||
import io.nosqlbench.api.errors.OpConfigError;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import io.nosqlbench.components.events.NBEvent;
|
||||
import io.nosqlbench.components.events.ParamChange;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.CycleRateSpec;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.StrideRateSpec;
|
||||
import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
|
||||
import io.nosqlbench.nb.annotations.ServiceSelector;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@ -238,4 +242,19 @@ public class StandardActivity<R extends Op, S> extends SimpleActivity implements
|
||||
public NBLabels getLabels() {
|
||||
return super.getLabels();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onEvent(NBEvent event) {
|
||||
switch(event) {
|
||||
case ParamChange<?> pc -> {
|
||||
switch (pc.value()) {
|
||||
case CycleRateSpec crs -> createOrUpdateCycleLimiter(crs);
|
||||
case StrideRateSpec srs -> createOrUpdateStrideLimiter(srs);
|
||||
default -> super.onEvent(event);
|
||||
}
|
||||
}
|
||||
default -> super.onEvent(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ import java.util.*;
|
||||
* was not recognized.
|
||||
*/
|
||||
public class SessionCommandParser {
|
||||
private final static Logger logger = LogManager.getLogger(SessionCommandParser.class);
|
||||
// private final static Logger logger = LogManager.getLogger(SessionCommandParser.class);
|
||||
|
||||
private static final String FRAGMENT = "fragment";
|
||||
private static final String SCRIPT = "script";
|
||||
@ -99,7 +99,7 @@ public class SessionCommandParser {
|
||||
} else if (NBCLIScenarioParser.isFoundWorkload(word, includes)) {
|
||||
NBCLIScenarioParser.parseScenarioCommand(arglist, RESERVED_WORDS, includes);
|
||||
} else {
|
||||
logger.warn("unrecognized Cmd: " + word);
|
||||
System.out.println("unrecognized Cmd: " + word); // instead of using logger due to init precedence
|
||||
return Optional.empty();
|
||||
}
|
||||
break;
|
||||
|
22
nb-api/src/main/java/io/nosqlbench/components/DownEvent.java
Normal file
22
nb-api/src/main/java/io/nosqlbench/components/DownEvent.java
Normal file
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.components;
|
||||
|
||||
import io.nosqlbench.components.events.NBEvent;
|
||||
|
||||
public interface DownEvent extends NBEvent {
|
||||
}
|
@ -20,11 +20,11 @@ package io.nosqlbench.components;
|
||||
|
||||
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
|
||||
import io.nosqlbench.api.labels.NBLabels;
|
||||
import io.nosqlbench.components.events.NBEvent;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
public class NBBaseComponent extends NBBaseComponentMetrics implements NBComponent {
|
||||
@ -137,4 +137,19 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void onEvent(NBEvent event) {
|
||||
logger.debug(() -> description() + " handling event " + event.toString());
|
||||
switch (event) {
|
||||
case UpEvent ue -> { if (parent!=null) parent.onEvent(ue); }
|
||||
case DownEvent de -> {
|
||||
for (NBComponent child : children) {
|
||||
child.onEvent(de);
|
||||
}
|
||||
}
|
||||
default -> logger.info("dropping event " + event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -33,7 +33,7 @@ import java.util.List;
|
||||
*
|
||||
* This interface includes more aspects of above by extension going forward.
|
||||
*/
|
||||
public interface NBComponent extends AutoCloseable, NBLabeledElement, NBComponentMetrics, NBComponentServices {
|
||||
public interface NBComponent extends AutoCloseable, NBLabeledElement, NBComponentMetrics, NBComponentServices, NBComponentEvents {
|
||||
|
||||
NBComponent EMPTY_COMPONENT = new NBBaseComponent(null);
|
||||
|
||||
|
@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.components;
|
||||
|
||||
import io.nosqlbench.components.events.NBEvent;
|
||||
|
||||
public interface NBComponentEvents {
|
||||
public void onEvent(NBEvent event);
|
||||
}
|
22
nb-api/src/main/java/io/nosqlbench/components/UpEvent.java
Normal file
22
nb-api/src/main/java/io/nosqlbench/components/UpEvent.java
Normal file
@ -0,0 +1,22 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.components;
|
||||
|
||||
import io.nosqlbench.components.events.NBEvent;
|
||||
|
||||
public interface UpEvent extends NBEvent {
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.components.events;
|
||||
|
||||
public interface NBEvent {
|
||||
}
|
@ -0,0 +1,21 @@
|
||||
/*
|
||||
* Copyright (c) 2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.components.events;
|
||||
|
||||
import io.nosqlbench.components.UpEvent;
|
||||
|
||||
public record ParamChange<T>(T value) implements UpEvent {}
|
@ -84,7 +84,10 @@ public class PerfWindowSampler {
|
||||
double[] vals = data[window][measuredItem];
|
||||
|
||||
if (criteria.get(measuredItem).delta) {
|
||||
return (vals[ENDS] - vals[STARTS]) / (vals[END_TIME] - vals[START_TIME])*1000.0d;
|
||||
double duration = (vals[END_TIME] - vals[START_TIME])/1000D;
|
||||
double increment = vals[ENDS] - vals[STARTS];
|
||||
|
||||
return increment / duration;
|
||||
} else {
|
||||
return vals[ENDS];
|
||||
}
|
||||
|
@ -24,7 +24,11 @@ import io.nosqlbench.api.engine.metrics.instruments.NBMetricTimer;
|
||||
import io.nosqlbench.api.optimizers.BobyqaOptimizerInstance;
|
||||
import io.nosqlbench.api.optimizers.MVResult;
|
||||
import io.nosqlbench.components.NBComponent;
|
||||
import io.nosqlbench.components.events.ParamChange;
|
||||
import io.nosqlbench.engine.api.activityapi.core.Activity;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.CycleRateSpec;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRate;
|
||||
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
|
||||
import io.nosqlbench.engine.core.lifecycle.scenario.direct.SCBaseScenario;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -51,7 +55,8 @@ public class SC_optimo extends SCBaseScenario {
|
||||
"cycles", String.valueOf(Long.MAX_VALUE),
|
||||
"threads", "1",
|
||||
"driver", "diag",
|
||||
"rate", "1"
|
||||
"rate", "1",
|
||||
"dryrun","op"
|
||||
));
|
||||
if (params.containsKey("workload")) {
|
||||
activityParams.put("workload", params.get("workload"));
|
||||
@ -67,13 +72,13 @@ public class SC_optimo extends SCBaseScenario {
|
||||
BobyqaOptimizerInstance bobby = create().bobyqaOptimizer();
|
||||
bobby.param("rate", 1.0d, 10000.d);
|
||||
bobby.param("threads", 1.0d, 1000.0d);
|
||||
bobby.setInitialRadius(10000.0).setStoppingRadius(0.001).setMaxEval(1000);
|
||||
bobby.param("noise", 100d, 200.0d);
|
||||
bobby.setInitialRadius(1000000.0).setStoppingRadius(0.001).setMaxEval(1000);
|
||||
|
||||
Activity flywheel = controller.start(activityParams);
|
||||
stdout.println("warming up for " + seconds + " seconds");
|
||||
controller.waitMillis(5000);
|
||||
|
||||
|
||||
/**
|
||||
* <P>This function is the objective function, and is responsible for applying
|
||||
* the parameters and yielding a result. The higher the returned result, the
|
||||
@ -83,38 +88,38 @@ public class SC_optimo extends SCBaseScenario {
|
||||
|
||||
PerfWindowSampler sampler = new PerfWindowSampler();
|
||||
NBMetricTimer result_success_timer = flywheel.find().timer("name:result_success");
|
||||
sampler.addDeltaTime("achieved_rate", result_success_timer::getCount, 1.0);
|
||||
sampler.addDeltaTime("achieved_rate", result_success_timer::getCount, 1000.0);
|
||||
final DeltaSnapshotReader snapshotter = result_success_timer.getDeltaReader();
|
||||
AtomicReference<ConvenientSnapshot> snapshot = new AtomicReference<>(snapshotter.getDeltaSnapshot());
|
||||
ValidAtOrBelow below15000 = ValidAtOrBelow.max(15000);
|
||||
sampler.addDirect(
|
||||
"p99latency",
|
||||
() -> below15000.applyAsDouble(snapshot.get().getP99ns()),
|
||||
-1.0,
|
||||
() -> snapshot.set(snapshotter.getDeltaSnapshot())
|
||||
);
|
||||
// ValidAtOrBelow below15000 = ValidAtOrBelow.max(15000);
|
||||
// sampler.addDirect(
|
||||
// "p99latency",
|
||||
// () -> below15000.applyAsDouble(snapshot.get().getP99ns()),
|
||||
// -1.0,
|
||||
// () -> snapshot.set(snapshotter.getDeltaSnapshot())
|
||||
// );
|
||||
sampler.startWindow();
|
||||
|
||||
ToDoubleFunction<double[]> f = new ToDoubleFunction<double[]>() {
|
||||
@Override
|
||||
public double applyAsDouble(double[] values) {
|
||||
stdout.println("params=" + Arrays.toString(values));
|
||||
int threads = (int) bobby.getParams().getValue("threads", values);
|
||||
double rate = bobby.getParams().getValue("rate", values);
|
||||
|
||||
stdout.println("setting threads to " + threads);
|
||||
int threads = (int) bobby.getParams().getValue("threads", values);
|
||||
stdout.println("SETTING threads to " + threads);
|
||||
flywheel.getActivityDef().setThreads(threads);
|
||||
|
||||
String ratespec = rate + ":1.1:restart";
|
||||
stdout.println("setting rate to " + ratespec);
|
||||
flywheel.getActivityDef().getParams().put("rate", ratespec);
|
||||
double rate = bobby.getParams().getValue("rate", values);
|
||||
stdout.println("SETTING rate to "+ rate);
|
||||
CycleRateSpec ratespec = new CycleRateSpec(rate, 1.1d, SimRateSpec.Verb.restart);
|
||||
flywheel.onEvent(new ParamChange<>(ratespec));
|
||||
|
||||
sampler.startWindow();
|
||||
stdout.println("waiting " + seconds + " seconds...");
|
||||
controller.waitMillis(seconds * 1000L);
|
||||
sampler.stopWindow();
|
||||
double value = sampler.getCurrentWindowValue();
|
||||
stdout.println(sampler.toString());
|
||||
stdout.println("RESULT: " + sampler);
|
||||
return value;
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user