incremental progress

This commit is contained in:
Jonathan Shook
2023-10-09 11:59:34 -05:00
parent 9fa711b7ab
commit 9e91a6201d
44 changed files with 710 additions and 175 deletions

View File

@@ -16,7 +16,6 @@
package io.nosqlbench.engine.api.activityapi.core; package io.nosqlbench.engine.api.activityapi.core;
import com.codahale.metrics.Timer;
import io.nosqlbench.components.NBComponent; import io.nosqlbench.components.NBComponent;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.engine.activityimpl.ParameterMap; import io.nosqlbench.api.engine.activityimpl.ParameterMap;
@@ -40,14 +39,6 @@ import java.util.function.Supplier;
*/ */
public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable, NBComponent { public interface Activity extends Comparable<Activity>, ActivityDefObserver, ProgressCapable, StateCapable, NBComponent {
/**
* Provide the activity with the controls needed to stop itself.
*
* @param activityController The dedicated control interface for this activity
*/
void setActivityController(ActivityController activityController);
/** /**
* Register an object which should be closed after this activity is shutdown. * Register an object which should be closed after this activity is shutdown.
* *

View File

@@ -78,7 +78,6 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
private RunState runState = RunState.Uninitialized; private RunState runState = RunState.Uninitialized;
private RateLimiter strideLimiter; private RateLimiter strideLimiter;
private RateLimiter cycleLimiter; private RateLimiter cycleLimiter;
private ActivityController activityController;
private ActivityInstrumentation activityInstrumentation; private ActivityInstrumentation activityInstrumentation;
private PrintWriter console; private PrintWriter console;
private long startedAtMillis; private long startedAtMillis;
@@ -208,17 +207,6 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
return getAlias().compareTo(o.getAlias()); return getAlias().compareTo(o.getAlias());
} }
@Override
public ActivityController getActivityController() {
return activityController;
}
@Override
public void setActivityController(ActivityController activityController) {
this.activityController = activityController;
}
@Override @Override
public void registerAutoCloseable(AutoCloseable closeable) { public void registerAutoCloseable(AutoCloseable closeable) {
this.closeables.add(closeable); this.closeables.add(closeable);
@@ -695,4 +683,5 @@ public class SimpleActivity extends NBBaseComponent implements Activity {
return tally; return tally;
} }
} }

View File

@@ -34,7 +34,6 @@ import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.api.errors.BasicError; import io.nosqlbench.api.errors.BasicError;
import io.nosqlbench.api.errors.OpConfigError; import io.nosqlbench.api.errors.OpConfigError;
import io.nosqlbench.api.labels.NBLabels; import io.nosqlbench.api.labels.NBLabels;
import io.nosqlbench.components.NBComponentLoader;
import io.nosqlbench.engine.api.activityapi.planning.OpSequence; import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
import io.nosqlbench.engine.api.activityimpl.SimpleActivity; import io.nosqlbench.engine.api.activityimpl.SimpleActivity;
import io.nosqlbench.nb.annotations.ServiceSelector; import io.nosqlbench.nb.annotations.ServiceSelector;

View File

@@ -86,4 +86,8 @@ public class StandardActivityType<A extends StandardActivity<?,?>> extends Simpl
} }
@Override
public void shutdownActivity() {
}
} }

View File

@@ -16,7 +16,7 @@
package io.nosqlbench.engine.core.lifecycle.activity; package io.nosqlbench.engine.core.lifecycle.activity;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ActivitiesController; import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioActivitiesController;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@@ -24,9 +24,9 @@ public class ActivitiesExceptionHandler implements Thread.UncaughtExceptionHandl
private static final Logger logger = LogManager.getLogger(ActivitiesExceptionHandler.class); private static final Logger logger = LogManager.getLogger(ActivitiesExceptionHandler.class);
private final ActivitiesController controller; private final ScenarioActivitiesController controller;
public ActivitiesExceptionHandler(ActivitiesController controller) { public ActivitiesExceptionHandler(ScenarioActivitiesController controller) {
this.controller = controller; this.controller = controller;
logger.debug(() -> "Activities exception handler starting up for executor '" + this.controller + "'"); logger.debug(() -> "Activities exception handler starting up for executor '" + this.controller + "'");
} }

View File

@@ -22,7 +22,7 @@ import io.nosqlbench.engine.api.activityapi.core.progress.StateCapable;
import io.nosqlbench.engine.api.metrics.IndicatorMode; import io.nosqlbench.engine.api.metrics.IndicatorMode;
import io.nosqlbench.api.engine.metrics.PeriodicRunnable; import io.nosqlbench.api.engine.metrics.PeriodicRunnable;
import io.nosqlbench.api.engine.util.Unit; import io.nosqlbench.api.engine.util.Unit;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ActivitiesController; import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioActivitiesController;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
@@ -35,14 +35,14 @@ public class ActivitiesProgressIndicator implements Runnable {
private final static Logger logger = LogManager.getLogger("PROGRESS"); private final static Logger logger = LogManager.getLogger("PROGRESS");
private final String indicatorSpec; private final String indicatorSpec;
private final ActivitiesController sc; private final ScenarioActivitiesController sc;
private PeriodicRunnable<ActivitiesProgressIndicator> runnable; private PeriodicRunnable<ActivitiesProgressIndicator> runnable;
private IndicatorMode indicatorMode = IndicatorMode.console; private IndicatorMode indicatorMode = IndicatorMode.console;
private final Set<String> seen = new HashSet<>(); private final Set<String> seen = new HashSet<>();
private long intervalMillis = 1L; private long intervalMillis = 1L;
public ActivitiesProgressIndicator(ActivitiesController sc, String indicatorSpec) { public ActivitiesProgressIndicator(ScenarioActivitiesController sc, String indicatorSpec) {
this.sc = sc; this.sc = sc;
this.indicatorSpec = indicatorSpec; this.indicatorSpec = indicatorSpec;
start(); start();

View File

@@ -57,7 +57,7 @@ import java.util.stream.Collectors;
* This allows the state tracking to work consistently for all observers.</p> * This allows the state tracking to work consistently for all observers.</p>
*/ */
public class ActivityExecutor implements NBLabeledElement, ActivityController, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> { public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener, ProgressCapable, Callable<ExecutionResult> {
// TODO Encapsulate valid state transitions to be only modifiable within the appropriate type view. // TODO Encapsulate valid state transitions to be only modifiable within the appropriate type view.
@@ -81,7 +81,6 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
this.activity = activity; this.activity = activity;
this.activityDef = activity.getActivityDef(); this.activityDef = activity.getActivityDef();
activity.getActivityDef().getParams().addListener(this); activity.getActivityDef().getParams().addListener(this);
activity.setActivityController(this);
this.tally = activity.getRunStateTally(); this.tally = activity.getRunStateTally();
} }
@@ -257,16 +256,24 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
private void increaseActiveMotorCountUpToThreadParam(ActivityDef activityDef) { private void increaseActiveMotorCountUpToThreadParam(ActivityDef activityDef) {
// Create motor slots // Create motor slots
try {
while (motors.size() < activityDef.getThreads()) { while (motors.size() < activityDef.getThreads()) {
Motor motor = activity.getMotorDispenserDelegate().getMotor(activityDef, motors.size()); Motor motor = activity.getMotorDispenserDelegate().getMotor(activityDef, motors.size());
logger.trace(() -> "Starting cycle motor thread:" + motor); logger.trace(() -> "Starting cycle motor thread:" + motor);
motors.add(motor); motors.add(motor);
} }
} catch (Exception e) {
System.out.print("critical error while starting motors: " + e);
logger.error("critical error while starting motors:" + e,e);
throw new RuntimeException(e);
}
} }
private void reduceActiveMotorCountDownToThreadParam(ActivityDef activityDef) { private void reduceActiveMotorCountDownToThreadParam(ActivityDef activityDef) {
// Stop and remove extra motor slots // Stop and remove extra motor slots
if (activityDef.getThreads()==0) {
logger.warn("setting threads to zero is not advised. At least one thread has to be active to keep the activity alive.");
}
while (motors.size() > activityDef.getThreads()) { while (motors.size() > activityDef.getThreads()) {
Motor motor = motors.get(motors.size() - 1); Motor motor = motors.get(motors.size() - 1);
logger.trace(() -> "Stopping cycle motor thread:" + motor); logger.trace(() -> "Stopping cycle motor thread:" + motor);
@@ -364,29 +371,6 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
this.requestStopMotors(); this.requestStopMotors();
} }
@Override
public synchronized void stopActivityWithReasonAsync(String reason) {
logger.info(() -> "Stopping activity " + this.activityDef.getAlias() + ": " + reason);
this.exception = new RuntimeException("Stopping activity " + this.activityDef.getAlias() + ": " + reason);
logger.error("stopping with reason: " + exception);
requestStopMotors();
}
@Override
public synchronized void stopActivityWithErrorAsync(Throwable throwable) {
if (exception == null) {
this.exception = new RuntimeException(throwable);
logger.error("stopping on error: " + throwable.toString(), throwable);
} else {
if (activityDef.getParams().getOptionalBoolean("fullerrors").orElse(false)) {
logger.error("additional error: " + throwable.toString(), throwable);
} else {
logger.warn("summarized error (fullerrors=false): " + throwable.toString());
}
}
requestStopMotors();
}
@Override @Override
public ProgressMeterDisplay getProgressMeter() { public ProgressMeterDisplay getProgressMeter() {
return this.activity.getProgressMeter(); return this.activity.getProgressMeter();
@@ -589,6 +573,10 @@ public class ActivityExecutor implements NBLabeledElement, ActivityController, P
.build()); .build());
} }
public void awaitMotorsRunningOrTerminalState() {
awaitMotorsAtLeastRunning();
}
private class ThreadsGauge implements Gauge<Double> { private class ThreadsGauge implements Gauge<Double> {
public ThreadsGauge(ActivityExecutor activityExecutor) { public ThreadsGauge(ActivityExecutor activityExecutor) {
ActivityExecutor ae = activityExecutor; ActivityExecutor ae = activityExecutor;

View File

@@ -28,11 +28,11 @@ import java.util.stream.Collectors;
*/ */
public class ActivityBindings implements Bindings, ProxyObject { public class ActivityBindings implements Bindings, ProxyObject {
private final ActivitiesController scenario; private final ScenarioActivitiesController scenario;
private final Map<String, Bindings> elementMap = new HashMap<String, Bindings>(); private final Map<String, Bindings> elementMap = new HashMap<String, Bindings>();
public ActivityBindings(ActivitiesController activitiesController) { public ActivityBindings(ScenarioActivitiesController scenarioActivitiesController) {
this.scenario = activitiesController; this.scenario = scenarioActivitiesController;
} }
@Override @Override

View File

@@ -56,7 +56,7 @@ public class NBDefaultSceneFixtures implements NBSceneFixtures {
* a given scenario. A scenario doesn't complete unless until all activities * a given scenario. A scenario doesn't complete unless until all activities
* are complete or errored. * are complete or errored.
*/ */
private ActivitiesController controller; private ScenarioActivitiesController controller;
/* /*
* Extensions provide additional scripting capabilities which are not provided by the * Extensions provide additional scripting capabilities which are not provided by the
* scripting or other runtimes, or new ways of tapping into extant features. * scripting or other runtimes, or new ways of tapping into extant features.
@@ -68,7 +68,7 @@ public class NBDefaultSceneFixtures implements NBSceneFixtures {
private Reader in; private Reader in;
public NBDefaultSceneFixtures(ScenarioParams params, NBComponent parent, ActivitiesController controller, Extensions extensions, PrintWriter out, PrintWriter err, Reader in) { public NBDefaultSceneFixtures(ScenarioParams params, NBComponent parent, ScenarioActivitiesController controller, Extensions extensions, PrintWriter out, PrintWriter err, Reader in) {
this.params = params; this.params = params;
this.session = parent; this.session = parent;
this.controller = controller; this.controller = controller;
@@ -84,7 +84,7 @@ public class NBDefaultSceneFixtures implements NBSceneFixtures {
new NBSession( new NBSession(
new TestComponent("scene", name), "scene~"+name new TestComponent("scene", name), "scene~"+name
), ),
new ActivitiesController(), new ScenarioActivitiesController(),
Extensions.ofNone(), Extensions.ofNone(),
new PrintWriter(System.out), new PrintWriter(System.out),
new PrintWriter(System.err), new PrintWriter(System.err),
@@ -104,7 +104,7 @@ public class NBDefaultSceneFixtures implements NBSceneFixtures {
} }
@Override @Override
public ActivitiesController controller() { public ScenarioActivitiesController controller() {
return controller; return controller;
} }

View File

@@ -80,7 +80,7 @@ public class NBSceneBuffer implements NBSceneFixtures {
} }
@Override @Override
public ActivitiesController controller() { public ScenarioActivitiesController controller() {
return fixtures.controller(); return fixtures.controller();
} }

View File

@@ -27,7 +27,7 @@ public interface NBSceneFixtures {
NBComponent component(); NBComponent component();
ActivitiesController controller(); ScenarioActivitiesController controller();
Extensions extensions(); Extensions extensions();

View File

@@ -41,31 +41,31 @@ import java.util.stream.Collectors;
* A ScenarioController provides a way to start Activities, * A ScenarioController provides a way to start Activities,
* modify them while running, and forceStopMotors, pause or restart them. * modify them while running, and forceStopMotors, pause or restart them.
*/ */
public class ActivitiesController extends NBBaseComponent { public class ScenarioActivitiesController extends NBBaseComponent {
private static final Logger logger = LogManager.getLogger(ActivitiesController.class); private static final Logger logger = LogManager.getLogger(ScenarioActivitiesController.class);
private static final Logger scenariologger = LogManager.getLogger("SCENARIO"); private static final Logger scenariologger = LogManager.getLogger("SCENARIO");
private final ActivityLoader activityLoader; private final ActivityLoader activityLoader;
private final Map<String, ActivityRuntimeInfo> activityInfoMap = new ConcurrentHashMap<>(); private final Map<String, ActivityRuntimeInfo> activityInfoMap = new ConcurrentHashMap<>();
private final ExecutorService activitiesExecutor; private final ExecutorService executorService;
public ActivitiesController() { public ScenarioActivitiesController() {
super(new TestComponent("test","test")); super(new TestComponent("test","test"));
this.activityLoader = new ActivityLoader(); this.activityLoader = new ActivityLoader();
ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this); ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this);
IndexedThreadFactory indexedThreadFactory = new IndexedThreadFactory("ACTIVITY", exceptionHandler); IndexedThreadFactory indexedThreadFactory = new IndexedThreadFactory("ACTIVITY", exceptionHandler);
this.activitiesExecutor = Executors.newVirtualThreadPerTaskExecutor(); this.executorService = Executors.newVirtualThreadPerTaskExecutor();
} }
public ActivitiesController(NBComponent parent) { public ScenarioActivitiesController(NBComponent parent) {
super(parent); super(parent);
this.activityLoader = new ActivityLoader(); this.activityLoader = new ActivityLoader();
ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this); ActivitiesExceptionHandler exceptionHandler = new ActivitiesExceptionHandler(this);
IndexedThreadFactory indexedThreadFactory = new IndexedThreadFactory("ACTIVITY", exceptionHandler); IndexedThreadFactory indexedThreadFactory = new IndexedThreadFactory("ACTIVITY", exceptionHandler);
this.activitiesExecutor = Executors.newCachedThreadPool(indexedThreadFactory); this.executorService = Executors.newVirtualThreadPerTaskExecutor();
} }
/** /**
@@ -84,8 +84,9 @@ public class ActivitiesController extends NBBaseComponent {
if (!this.activityInfoMap.containsKey(activityDef.getAlias())) { if (!this.activityInfoMap.containsKey(activityDef.getAlias())) {
Activity activity = this.activityLoader.loadActivity(activityDef, this); Activity activity = this.activityLoader.loadActivity(activityDef, this);
ActivityExecutor executor = new ActivityExecutor(activity); ActivityExecutor executor = new ActivityExecutor(activity);
Future<ExecutionResult> startedActivity = activitiesExecutor.submit(executor); Future<ExecutionResult> startedActivity = executorService.submit(executor);
ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor); ActivityRuntimeInfo activityRuntimeInfo = new ActivityRuntimeInfo(activity, startedActivity, executor);
activityRuntimeInfo.getActivityExecutor().awaitMotorsRunningOrTerminalState();
this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo); this.activityInfoMap.put(activity.getAlias(), activityRuntimeInfo);
} }
@@ -181,6 +182,10 @@ public class ActivitiesController extends NBBaseComponent {
runtimeInfo.stopActivity(); runtimeInfo.stopActivity();
} }
public synchronized void stop(Activity activity) {
stop(activity.getActivityDef());
}
/** /**
* <p>Stop an activity, given an activity def map. The only part of the map that is important is the * <p>Stop an activity, given an activity def map. The only part of the map that is important is the
* alias parameter. This method retains the map signature to provide convenience for scripting.</p> * alias parameter. This method retains the map signature to provide convenience for scripting.</p>
@@ -452,7 +457,7 @@ public class ActivitiesController extends NBBaseComponent {
public void shutdown() { public void shutdown() {
logger.debug(() -> "Requesting ScenarioController shutdown."); logger.debug(() -> "Requesting ScenarioController shutdown.");
this.activitiesExecutor.shutdownNow(); this.executorService.shutdownNow();
// try { // try {
// if (!this.activitiesExecutor.awaitTermination(5, TimeUnit.SECONDS)) { // if (!this.activitiesExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
// logger.info(() -> "Scenario is being forced to shutdown after waiting 5 seconds for graceful shutdown."); // logger.info(() -> "Scenario is being forced to shutdown after waiting 5 seconds for graceful shutdown.");

View File

@@ -21,17 +21,19 @@ package io.nosqlbench.engine.core.lifecycle.scenario.context;
import io.nosqlbench.components.NBComponent; import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions; import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.Reader; import java.io.Reader;
import java.util.Map; import java.util.Map;
public class SceneBuilder implements SceneBuilderFacets.ALL { public class SceneBuilder implements SceneBuilderFacets.ALL {
private Map<String,String> params; private Map<String,String> params;
private ActivitiesController controller; private ScenarioActivitiesController controller;
private Extensions extensions; private Extensions extensions;
private PrintWriter out; private PrintWriter out = new PrintWriter(System.out);
private PrintWriter err; private PrintWriter err = new PrintWriter(System.err);
private Reader in; private Reader in = new InputStreamReader(System.in);
private NBComponent component; private NBComponent component;
private NBSceneBuffer.IOType iotype; private NBSceneBuffer.IOType iotype;
@@ -47,7 +49,7 @@ public class SceneBuilder implements SceneBuilderFacets.ALL {
new NBDefaultSceneFixtures( new NBDefaultSceneFixtures(
ScenarioParams.of(this.params), ScenarioParams.of(this.params),
this.component, this.component,
((this.controller!=null) ? this.controller : new ActivitiesController(component)), ((this.controller!=null) ? this.controller : new ScenarioActivitiesController(component)),
this.extensions, this.extensions,
this.out, this.out,
this.err, this.err,
@@ -57,7 +59,7 @@ public class SceneBuilder implements SceneBuilderFacets.ALL {
@Override @Override
public SceneBuilder controller(ActivitiesController controller) { public SceneBuilder controller(ScenarioActivitiesController controller) {
this.controller = controller; this.controller = controller;
return this; return this;
} }

View File

@@ -44,7 +44,7 @@ public interface SceneBuilderFacets {
} }
public interface WantsController extends WantsStdin, WantsIoType { public interface WantsController extends WantsStdin, WantsIoType {
public WantsStdin controller(ActivitiesController controller); public WantsStdin controller(ScenarioActivitiesController controller);
} }

View File

@@ -17,7 +17,7 @@
package io.nosqlbench.engine.core.lifecycle.scenario.direct; package io.nosqlbench.engine.core.lifecycle.scenario.direct;
import io.nosqlbench.components.NBComponent; import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ActivitiesController; import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneFixtures; import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneFixtures;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioParams; import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioParams;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions; import io.nosqlbench.engine.core.lifecycle.scenario.execution.Extensions;
@@ -32,7 +32,7 @@ public abstract class SCBaseScenario extends NBScenario {
protected Reader stdin; protected Reader stdin;
protected PrintWriter stdout; protected PrintWriter stdout;
protected Writer stderr; protected Writer stderr;
protected ActivitiesController controller; protected ScenarioActivitiesController controller;
protected ScenarioParams params; protected ScenarioParams params;
protected Extensions extensions; protected Extensions extensions;
@@ -60,7 +60,7 @@ public abstract class SCBaseScenario extends NBScenario {
* <LI>component, an {@link NBComponent} - The NB component upon which all metrics or other services are attached.</LI> * <LI>component, an {@link NBComponent} - The NB component upon which all metrics or other services are attached.</LI>
* <LI>stdin - a {@link Reader} representing the input buffer which would normally be {@link System#in} * <LI>stdin - a {@link Reader} representing the input buffer which would normally be {@link System#in}
* <LI>stdout, stderr</LI>- a {@link PrintWriter}; This can be buffered virtually, attached to {@link System#out} and {@link System#err} or both for IO tracing.</LI> * <LI>stdout, stderr</LI>- a {@link PrintWriter}; This can be buffered virtually, attached to {@link System#out} and {@link System#err} or both for IO tracing.</LI>
* <LI>controller - A dedicated {@link ActivitiesController} which can be used to define, start, top, and interact with activities.</LI> * <LI>controller - A dedicated {@link ScenarioActivitiesController} which can be used to define, start, top, and interact with activities.</LI>
* <LI>params - The {@link ScenarioParams} which have been passed to this scenario.</LI> * <LI>params - The {@link ScenarioParams} which have been passed to this scenario.</LI>
* <LI>extensions - A dedicated ahndle to the {@link Extensions} service.</LI> * <LI>extensions - A dedicated ahndle to the {@link Extensions} service.</LI>
* <LI><EM>all component services</EM> as this scenario IS a component. This includes all implemented methods in any of the {@link NBComponent} sub-interfaces.</EM> * <LI><EM>all component services</EM> as this scenario IS a component. This includes all implemented methods in any of the {@link NBComponent} sub-interfaces.</EM>

View File

@@ -26,7 +26,7 @@ import io.nosqlbench.components.NBBaseComponent;
import io.nosqlbench.components.NBComponentErrorHandler; import io.nosqlbench.components.NBComponentErrorHandler;
import io.nosqlbench.engine.core.annotation.Annotators; import io.nosqlbench.engine.core.annotation.Annotators;
import io.nosqlbench.engine.core.lifecycle.activity.ActivitiesProgressIndicator; import io.nosqlbench.engine.core.lifecycle.activity.ActivitiesProgressIndicator;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ActivitiesController; import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioActivitiesController;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneBuffer; import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneBuffer;
import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneFixtures; import io.nosqlbench.engine.core.lifecycle.scenario.context.NBSceneFixtures;
import io.nosqlbench.engine.core.lifecycle.scenario.script.NBScriptedScenario; import io.nosqlbench.engine.core.lifecycle.scenario.script.NBScriptedScenario;
@@ -54,7 +54,7 @@ public abstract class NBScenario extends NBBaseComponent
private ScenarioMetadata scenarioMetadata; private ScenarioMetadata scenarioMetadata;
private ActivitiesController activitiesController; private ScenarioActivitiesController scenarioActivitiesController;
private Exception error; private Exception error;
private String progressInterval = "console:10s"; private String progressInterval = "console:10s";
private ActivitiesProgressIndicator activitiesProgressIndicator; private ActivitiesProgressIndicator activitiesProgressIndicator;
@@ -68,15 +68,15 @@ public abstract class NBScenario extends NBBaseComponent
} }
public void forceStopScenario(int i, boolean b) { public void forceStopScenario(int i, boolean b) {
activitiesController.forceStopScenario(i,b); scenarioActivitiesController.forceStopScenario(i,b);
} }
// public Map<String, String> getParams() { // public Map<String, String> getParams() {
// return this.params; // return this.params;
// } // }
public ActivitiesController getActivitiesController() { public ScenarioActivitiesController getActivitiesController() {
return this.activitiesController; return this.scenarioActivitiesController;
} }
public enum State { public enum State {
@@ -111,7 +111,7 @@ public abstract class NBScenario extends NBBaseComponent
*/ */
@Override @Override
public final ScenarioResult apply(NBSceneBuffer sctx) { public final ScenarioResult apply(NBSceneBuffer sctx) {
this.activitiesController=sctx.controller(); this.scenarioActivitiesController =sctx.controller();
this.scenarioShutdownHook = new ScenarioShutdownHook(this); this.scenarioShutdownHook = new ScenarioShutdownHook(this);
Runtime.getRuntime().addShutdownHook(this.scenarioShutdownHook); Runtime.getRuntime().addShutdownHook(this.scenarioShutdownHook);
@@ -127,24 +127,24 @@ public abstract class NBScenario extends NBBaseComponent
); );
if (!"disabled".equals(progressInterval) && progressInterval!=null && !progressInterval.isEmpty()) if (!"disabled".equals(progressInterval) && progressInterval!=null && !progressInterval.isEmpty())
this.activitiesProgressIndicator = new ActivitiesProgressIndicator(activitiesController, this.progressInterval); this.activitiesProgressIndicator = new ActivitiesProgressIndicator(scenarioActivitiesController, this.progressInterval);
ScenarioResult result = null; ScenarioResult result = null;
try { try {
runScenario(sctx.asFixtures()); runScenario(sctx.asFixtures());
final long awaitCompletionTime = 86400 * 365 * 1000L; final long awaitCompletionTime = 86400 * 365 * 1000L;
this.logger.debug("Awaiting completion of scenario and activities for {} millis.", awaitCompletionTime); this.logger.debug("Awaiting completion of scenario and activities for {} millis.", awaitCompletionTime);
this.activitiesController.awaitCompletion(awaitCompletionTime); this.scenarioActivitiesController.awaitCompletion(awaitCompletionTime);
} catch (Exception e) { } catch (Exception e) {
try { try {
activitiesController.forceStopScenario(3000, false); scenarioActivitiesController.forceStopScenario(3000, false);
} catch (final Exception eInner) { } catch (final Exception eInner) {
this.logger.debug("Found inner exception while forcing stop with rethrow=false: {}", eInner); this.logger.debug("Found inner exception while forcing stop with rethrow=false: {}", eInner);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
this.error = e; this.error = e;
} finally { } finally {
this.activitiesController.shutdown(); this.scenarioActivitiesController.shutdown();
this.endedAtMillis = System.currentTimeMillis(); this.endedAtMillis = System.currentTimeMillis();
result = new ScenarioResult( result = new ScenarioResult(
sctx, sctx,
@@ -183,7 +183,7 @@ public abstract class NBScenario extends NBBaseComponent
} else } else
this.logger.info( this.logger.info(
"Scenario completed successfully, with {} logical activities.", "Scenario completed successfully, with {} logical activities.",
activitiesController.getActivityExecutorMap().size() scenarioActivitiesController.getActivityExecutorMap().size()
); );
this.logger.info(() -> "scenario state: " + state); this.logger.info(() -> "scenario state: " + state);

View File

@@ -17,7 +17,7 @@
package io.nosqlbench.engine.core.lifecycle.scenario.script.bindings; package io.nosqlbench.engine.core.lifecycle.scenario.script.bindings;
import io.nosqlbench.api.engine.activityimpl.ActivityDef; import io.nosqlbench.api.engine.activityimpl.ActivityDef;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ActivitiesController; import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioActivitiesController;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.graalvm.polyglot.Value; import org.graalvm.polyglot.Value;
@@ -29,9 +29,9 @@ public class PolyglotScenarioController {
private static final Logger logger = LogManager.getLogger("SCENARIO/POLYGLOT"); private static final Logger logger = LogManager.getLogger("SCENARIO/POLYGLOT");
private final ActivitiesController controller; private final ScenarioActivitiesController controller;
public PolyglotScenarioController(ActivitiesController inner) { public PolyglotScenarioController(ScenarioActivitiesController inner) {
this.controller = inner; this.controller = inner;
} }

View File

@@ -203,6 +203,7 @@ public class ComputeFunctions extends NBBaseComponent {
public static double average_precision(int[] relevant, int[] actual, int k) { public static double average_precision(int[] relevant, int[] actual, int k) {
relevant = Arrays.copyOfRange(relevant,0,k); relevant = Arrays.copyOfRange(relevant,0,k);
int maxK = Math.min(k,actual.length); int maxK = Math.min(k,actual.length);
relevant = Arrays.copyOfRange(relevant,0,k);
HashSet<Integer> relevantSet = new HashSet<>(relevant.length); HashSet<Integer> relevantSet = new HashSet<>(relevant.length);
for (Integer i : relevant) { for (Integer i : relevant) {
relevantSet.add(i); relevantSet.add(i);
@@ -225,6 +226,7 @@ public class ComputeFunctions extends NBBaseComponent {
public static double average_precision(long[] relevant, long[] actual, int k) { public static double average_precision(long[] relevant, long[] actual, int k) {
relevant = Arrays.copyOfRange(relevant,0,k); relevant = Arrays.copyOfRange(relevant,0,k);
int maxK = Math.min(k,actual.length); int maxK = Math.min(k,actual.length);
relevant = Arrays.copyOfRange(relevant,0,k);
HashSet<Long> refset = new HashSet<>(relevant.length); HashSet<Long> refset = new HashSet<>(relevant.length);
for (Long i : relevant) { for (Long i : relevant) {
refset.add(i); refset.add(i);

View File

@@ -31,6 +31,11 @@ public class DoubleSummaryGauge implements NBMetricGauge, DoubleConsumer {
private final Stat stat; private final Stat stat;
private final DoubleSummaryStatistics stats; private final DoubleSummaryStatistics stats;
@Override
public String typeName() {
return "gauge";
}
public enum Stat { public enum Stat {
Min, Min,
Max, Max,

View File

@@ -29,5 +29,8 @@ public class NBBaseMetric implements NBMetric {
return this.labels; return this.labels;
} }
@Override
public String typeName() {
return "basetype";
}
} }

View File

@@ -51,6 +51,11 @@ public class NBFunctionGauge implements NBMetricGauge {
public String toString() { public String toString() {
return description(); return description();
} }
@Override
public String typeName() {
return "gauge";
}
} }

View File

@@ -23,4 +23,5 @@ public interface NBMetric extends Metric, NBLabeledElement {
default String getHandle() { default String getHandle() {
return this.getLabels().linearizeAsMetrics(); return this.getLabels().linearizeAsMetrics();
} }
String typeName();
} }

View File

@@ -32,4 +32,14 @@ public class NBMetricCounter extends Counter implements NBMetric {
public NBLabels getLabels() { public NBLabels getLabels() {
return labels; return labels;
} }
@Override
public String typeName() {
return "counter";
}
@Override
public String toString() {
return description();
}
} }

View File

@@ -42,4 +42,9 @@ public class NBMetricGaugeWrapper implements NBMetricGauge, NBMetric {
public NBLabels getLabels() { public NBLabels getLabels() {
return labels; return labels;
} }
@Override
public String typeName() {
return "gauge";
}
} }

View File

@@ -109,4 +109,14 @@ public class NBMetricHistogram extends Histogram implements DeltaSnapshotter, Hd
public NBLabels getLabels() { public NBLabels getLabels() {
return this.labels; return this.labels;
} }
@Override
public String typeName() {
return "histogram";
}
@Override
public String toString() {
return description();
}
} }

View File

@@ -31,4 +31,9 @@ public class NBMetricMeter extends Meter implements NBMetric {
public NBLabels getLabels() { public NBLabels getLabels() {
return labels; return labels;
} }
@Override
public String typeName() {
return "meter";
}
} }

View File

@@ -90,4 +90,9 @@ public class NBMetricTimer extends Timer implements DeltaSnapshotter, HdrDeltaHi
public String toString() { public String toString() {
return description(); return description();
} }
@Override
public String typeName() {
return "timer";
}
} }

View File

@@ -190,4 +190,8 @@ public class BobyqaOptimizerInstance extends NBBaseComponent {
public double[] getResult() { public double[] getResult() {
return result.getPoint(); return result.getPoint();
} }
public MVParams getParams() {
return params;
}
} }

View File

@@ -30,6 +30,15 @@ public class MVParams implements Iterable<MVParams.MVParam> {
return this; return this;
} }
public double getValue(String name, double[] values) {
for (int i = 0; i < paramList.size(); i++) {
if (paramList.get(i).name.toLowerCase().equals(name.toLowerCase())) {
return values[i];
}
}
throw new RuntimeException("no index found for param named '" + name + "', out of " + paramList);
}
public int size() { public int size() {
return paramList.size(); return paramList.size();
} }

View File

@@ -132,7 +132,7 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
if (getComponentMetrics().size()>0) { if (getComponentMetrics().size()>0) {
sb.append(System.lineSeparator()).append("metrics:"); sb.append(System.lineSeparator()).append("metrics:");
for (NBMetric componentMetric : getComponentMetrics()) { for (NBMetric componentMetric : getComponentMetrics()) {
sb.append(System.lineSeparator()).append("m ").append(componentMetric.toString()); sb.append(System.lineSeparator()).append(" ").append(componentMetric.toString());
} }
} }
return sb.toString(); return sb.toString();

View File

@@ -18,6 +18,8 @@ package io.nosqlbench.components;
*/ */
import io.nosqlbench.api.engine.metrics.instruments.NBMetric;
public class NBComponentFormats { public class NBComponentFormats {
public static String formatAsTree(NBBaseComponent base) { public static String formatAsTree(NBBaseComponent base) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@@ -38,11 +40,18 @@ public class NBComponentFormats {
public void visit(NBComponent component, int depth) { public void visit(NBComponent component, int depth) {
String indent = " ".repeat(depth); String indent = " ".repeat(depth);
builder.append(indent).append(String.format("%03d %s",depth,component.description())); builder.append(indent).append(String.format("%03d %s",depth,component.description()));
String string = component.toString(); String componentNativeTypeToString = component.toString();
String[] split = string.split(System.lineSeparator()); String[] toStringLines = componentNativeTypeToString.split(System.lineSeparator());
for (String s : split) { for (String s : toStringLines) {
builder.append(System.lineSeparator()).append(indent).append(" >").append(s); builder.append(System.lineSeparator()).append(indent).append(" >").append(s);
} }
if (component.getComponentMetrics().size()>0) {
builder.append(System.lineSeparator()).append("metrics:");
for (NBMetric componentMetric : component.getComponentMetrics()) {
builder.append(indent).append(System.lineSeparator()).append(" ").append(componentMetric.toString());
}
}
builder.append(System.lineSeparator()); builder.append(System.lineSeparator());
} }
} }

View File

@@ -34,7 +34,9 @@ final class FanWriter extends Writer {
public void write(char @NotNull [] cbuf, int off, int len) throws IOException { public void write(char @NotNull [] cbuf, int off, int len) throws IOException {
for (Writer writer : writers) { for (Writer writer : writers) {
writer.write(cbuf, off, len); writer.write(cbuf, off, len);
writer.flush();
} }
} }
@Override @Override

View File

@@ -65,7 +65,7 @@ public class InterjectingCharArrayWriter extends CharArrayWriter {
if (time < 0) { if (time < 0) {
time = System.currentTimeMillis(); time = System.currentTimeMillis();
} }
if (times.length < timeidx) { if (times.length <= timeidx) {
long[] realloc = new long[times.length << 1]; long[] realloc = new long[times.length << 1];
System.arraycopy(times, 0, realloc, 0, times.length); System.arraycopy(times, 0, realloc, 0, times.length);
this.times = realloc; this.times = realloc;

View File

@@ -0,0 +1,56 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.scenarios;
import java.util.function.DoubleSupplier;
public abstract class BasePerfDimension implements PerfDimension {
private double weight;
private DoubleSupplier supplier;
private String name;
private Weighting weighting = Weighting.uniform;
public BasePerfDimension(String name, double weight, Weighting weighting, DoubleSupplier supplier) {
this.weight = weight;
this.supplier = supplier;
this.name = name;
this.weighting = weighting;
}
@Override
public double getWeight() {
return this.weight;
}
@Override
public DoubleSupplier getSupplier() {
return supplier;
}
@Override
public String getName() {
return name;
}
@Override
public Weighting getWeighting() {
return weighting;
}
@Override
public abstract String getValue();
}

View File

@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.scenarios;
import java.util.function.DoubleSupplier;
public interface PerfDimension {
public double getWeight();
public Weighting getWeighting();
public DoubleSupplier getSupplier();
public String getName();
public String getValue();
}

View File

@@ -0,0 +1,158 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.scenarios;
import java.util.ArrayList;
import java.util.List;
import java.util.function.DoubleSupplier;
/**
* This is a helper class that makes it easy to bundle up a combination of measurable
* factors and get a windowed sample from them. To use it, add your named data sources
* with their coefficients, and optionally a callback which resets the measurement
* buffers for the next time. When you call {@link #getCurrentWindowValue()}, all callbacks
* are used after the value computation is complete.
*
* <P>This is NOT thread safe!</P>
*/
public class PerfWindowSampler {
private final List<Criterion> criteria = new ArrayList<>();
private boolean openWindow = false;
private final static int STARTS = 0;
private final static int ENDS = 1;
private final static int WEIGHTED = 2;
private final static int START_TIME = 3;
private final static int END_TIME = 4;
private final static int ARYSIZE = END_TIME+1;
/**
* window, measure, START,STOP,WEIGHTED
*/
private double[][][] data;
private int window = -1;
void addDirect(String name, DoubleSupplier supplier, double weight, Runnable callback) {
this.criteria.add(new Criterion(name, supplier, weight, callback, false));
}
void addDirect(String name, DoubleSupplier supplier, double weight) {
addDirect(name, supplier, weight, () -> {
});
}
void addDeltaTime(String name, DoubleSupplier supplier, double weight, Runnable callback) {
this.criteria.add(new Criterion(name, supplier, weight, callback, true));
}
void addDeltaTime(String name, DoubleSupplier supplier, double weight) {
addDeltaTime(name, supplier, weight, () -> {
});
}
double getCurrentWindowValue() {
if (openWindow) {
throw new RuntimeException("invalid access to checkpoint value on open window.");
}
double product = 1.0d;
if (data==null) {
return Double.NaN;
}
double[][] values = data[window];
for (int i = 0; i < criteria.size(); i++) {
product *= values[i][WEIGHTED];
}
return product;
}
private double valueOf(int measuredItem) {
double[] vals = data[window][measuredItem];
if (criteria.get(measuredItem).delta) {
return (vals[ENDS] - vals[STARTS]) / (vals[END_TIME] - vals[START_TIME])*1000.0d;
} else {
return vals[ENDS];
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("PERF " + (openWindow ? "OPENWINDOW! " : "" ) + "sampler value =").append(getCurrentWindowValue()).append("\n");
for (int i = 0; i < criteria.size(); i++) {
Criterion criterion = criteria.get(i);
sb.append("->").append(criterion.name).append(" last=").append(valueOf(i)).append("\n");
}
return sb.toString();
}
public void startWindow() {
startWindow(System.currentTimeMillis());
}
public void startWindow(long now) {
openWindow=true;
window++;
if (this.data == null) {
this.data = new double[1][criteria.size()][ARYSIZE];
}
if (this.window >=data.length) {
double[][][] newary = new double[data.length<<1][criteria.size()][ARYSIZE];
System.arraycopy(data,0,newary,0,data.length);
this.data = newary;
}
for (int i = 0; i < criteria.size(); i++) {
data[window][i][START_TIME] = now;
Criterion criterion = criteria.get(i);
if (criterion.delta) {
data[window][i][STARTS] = criterion.supplier.getAsDouble();
} else {
data[window][i][STARTS] = Double.NaN;
}
criterion.callback.run();
}
for (Criterion criterion : criteria) {
criterion.callback.run();
}
}
public void stopWindow() {
stopWindow(System.currentTimeMillis());
}
public void stopWindow(long now) {
for (int i = 0; i < criteria.size(); i++) {
data[window][i][END_TIME] = now;
Criterion criterion = criteria.get(i);
double endmark = criterion.supplier.getAsDouble();
data[window][i][ENDS] = endmark;
double sample = valueOf(i);
data[window][i][WEIGHTED] = sample* criterion.weight;
}
openWindow=false;
}
public static record Criterion(
String name,
DoubleSupplier supplier,
double weight,
Runnable callback,
boolean delta
) { }
}

View File

@@ -18,7 +18,9 @@ package io.nosqlbench.scenarios;
*/ */
import io.nosqlbench.api.engine.metrics.instruments.NBMetric; import io.nosqlbench.api.engine.metrics.ConvenientSnapshot;
import io.nosqlbench.api.engine.metrics.DeltaSnapshotReader;
import io.nosqlbench.api.engine.metrics.instruments.NBMetricTimer;
import io.nosqlbench.api.optimizers.BobyqaOptimizerInstance; import io.nosqlbench.api.optimizers.BobyqaOptimizerInstance;
import io.nosqlbench.api.optimizers.MVResult; import io.nosqlbench.api.optimizers.MVResult;
import io.nosqlbench.components.NBComponent; import io.nosqlbench.components.NBComponent;
@@ -27,12 +29,15 @@ import io.nosqlbench.engine.core.lifecycle.scenario.direct.SCBaseScenario;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToDoubleFunction; import java.util.function.ToDoubleFunction;
public class SC_optimo extends SCBaseScenario { public class SC_optimo extends SCBaseScenario {
private final static Logger logger = LogManager.getLogger(SC_optimo.class); private final static Logger logger = LogManager.getLogger(SC_optimo.class);
public SC_optimo(NBComponent parentComponent, String scenarioName) { public SC_optimo(NBComponent parentComponent, String scenarioName) {
super(parentComponent, scenarioName); super(parentComponent, scenarioName);
} }
@@ -57,38 +62,67 @@ public class SC_optimo extends SCBaseScenario {
logger.warn("You provided neither a workload nor an op, so assuming diagnostic mode."); logger.warn("You provided neither a workload nor an op, so assuming diagnostic mode.");
} }
Activity flywheel = controller.start(activityParams); int seconds = params.containsKey("window") ? Integer.parseInt(params.get("window")) : 5;
BobyqaOptimizerInstance bobby = create().bobyqaOptimizer(); BobyqaOptimizerInstance bobby = create().bobyqaOptimizer();
bobby.param("rate", 1.0d, 10000.d);
bobby.param("threads", 0.0d, 200000.0d); bobby.param("threads", 1.0d, 1000.0d);
bobby.param("rate", 0.0d, 1_000_000.d);
bobby.setInitialRadius(10000.0).setStoppingRadius(0.001).setMaxEval(1000); bobby.setInitialRadius(10000.0).setStoppingRadius(0.001).setMaxEval(1000);
Activity flywheel = controller.start(activityParams);
stdout.println("warming up for " + seconds + " seconds");
controller.waitMillis(5000);
/** /**
* <P>This function is the objective function, and is responsible for applying * <P>This function is the objective function, and is responsible for applying
* the parameters and yielding a result. The higher the returned result, the * the parameters and yielding a result. The higher the returned result, the
* better the parameters are.</P> * better the parameters are.</P>
* <P>The parameter values will be passed in as an array, pair-wise with the param calls above.</P> * <P>The parameter values will be passed in as an array, pair-wise with the param calls above.</P>
*/ */
PerfWindowSampler sampler = new PerfWindowSampler();
NBMetricTimer result_success_timer = flywheel.find().timer("name:ressult_success");
sampler.addDeltaTime("achieved_rate", result_success_timer::getCount, 1.0);
final DeltaSnapshotReader snapshotter = result_success_timer.getDeltaReader();
AtomicReference<ConvenientSnapshot> snapshot = new AtomicReference<>(snapshotter.getDeltaSnapshot());
ValidAtOrBelow below15000 = ValidAtOrBelow.max(15000);
sampler.addDirect(
"p99latency",
() -> below15000.applyAsDouble(snapshot.get().getP99ns()),
-1.0,
() -> snapshot.set(snapshotter.getDeltaSnapshot())
);
sampler.startWindow();
ToDoubleFunction<double[]> f = new ToDoubleFunction<double[]>() { ToDoubleFunction<double[]> f = new ToDoubleFunction<double[]>() {
@Override @Override
public double applyAsDouble(double[] value) { public double applyAsDouble(double[] values) {
int threads=(int)value[0]; stdout.println("params=" + Arrays.toString(values));
int threads = (int) bobby.getParams().getValue("threads", values);
NBMetric counter = flywheel.find().counter("counterstuff"); double rate = bobby.getParams().getValue("rate", values);
stdout.println("setting threads to " + threads);
flywheel.getActivityDef().setThreads(threads); flywheel.getActivityDef().setThreads(threads);
double rate=value[1]; String ratespec = rate + ":1.1:restart";
flywheel.getActivityDef().setCycles(String.valueOf(rate)); stdout.println("setting rate to " + ratespec);
flywheel.getActivityDef().getParams().put("rate", ratespec);
sampler.startWindow();
stdout.println("waiting " + seconds + " seconds...");
controller.waitMillis(seconds * 1000L);
sampler.stopWindow();
double value = sampler.getCurrentWindowValue();
stdout.println(sampler.toString());
return value;
return 10000000 - ((Math.abs(100-value[0])) + (Math.abs(100-value[1])));
} }
}; };
bobby.setObjectiveFunction(f); bobby.setObjectiveFunction(f);
MVResult result = bobby.optimize(); MVResult result = bobby.optimize();
controller.stop(flywheel);
stdout.println("optimized result was " + result); stdout.println("optimized result was " + result);
stdout.println("map of result was " + result.getMap()); stdout.println("map of result was " + result.getMap());

View File

@@ -0,0 +1,31 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.scenarios;
import java.util.function.DoubleSupplier;
public class Uniform extends BasePerfDimension {
public Uniform(double weight, DoubleSupplier supplier, String name) {
super(name, weight, Weighting.uniform, supplier);
}
@Override
public String getValue() {
return null;
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.scenarios;
import jakarta.validation.Valid;
import java.util.function.DoubleUnaryOperator;
public class ValidAtOrAbove implements DoubleUnaryOperator {
public ValidAtOrAbove(double threshold, double defaultValue) {
this.threshold = threshold;
this.defaultValue = defaultValue;
}
private double threshold;
private double defaultValue;
@Override
public double applyAsDouble(double operand) {
if (operand>=threshold) {
return operand;
} else {
return defaultValue;
}
}
public static ValidAtOrAbove min(double min) {
return new ValidAtOrAbove(min,0.0d);
}
}

View File

@@ -0,0 +1,43 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.scenarios;
import java.util.function.DoubleUnaryOperator;
public class ValidAtOrBelow implements DoubleUnaryOperator {
public ValidAtOrBelow(double threshold, double defaultValue) {
this.threshold = threshold;
this.defaultValue = defaultValue;
}
private double threshold;
private double defaultValue;
@Override
public double applyAsDouble(double operand) {
if (operand<=threshold) {
return operand;
} else {
return defaultValue;
}
}
public static ValidAtOrBelow max(double max) {
return new ValidAtOrBelow(max,0.0d);
}
}

View File

@@ -0,0 +1,45 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.scenarios;
public class ValueFuncs {
public static double zeroBelow(double value, double threshold) {
if (value<threshold) {
return 0.0d;
}
return value;
}
public static double zeroAbove(double value, double threshold) {
if (value>threshold) {
return 0.0d;
}
return value;
}
/**
* Apply exponential weighting to the value base 2. For rate=1.0, the weight
*/
public static double exp_2(double value) {
return (value*value)+1;
}
public static double exp_e(double value) {
return Math.exp(value);
}
}

View File

@@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2022-2023 nosqlbench * Copyright (c) 2023 nosqlbench
* *
* Licensed under the Apache License, Version 2.0 (the "License"); * Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License. * you may not use this file except in compliance with the License.
@@ -14,9 +14,13 @@
* limitations under the License. * limitations under the License.
*/ */
package io.nosqlbench.engine.api.activityapi.core; package io.nosqlbench.scenarios;
public interface ActivityController { public enum Weighting {
void stopActivityWithReasonAsync(String reason); uniform;
void stopActivityWithErrorAsync(Throwable throwable); public double applyWeight(double input) {
return switch (this) {
case uniform -> input;
};
}
} }

View File

@@ -1,36 +0,0 @@
package io.nosqlbench.scenarios;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import io.nosqlbench.components.NBComponent;
public class WindowSampler {
private final NBComponent base;
public WindowSampler(NBComponent component) {
this.base = component;
component.find().metric("doesnot=exist");
}
public Sample sample() {
return new Sample(1.0d,2.0d);
}
public static record Sample(double rate, double p99) { }
}

View File

@@ -0,0 +1,72 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.scenarios;
import org.assertj.core.data.Offset;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.DoubleSupplier;
import static org.assertj.core.api.Assertions.assertThat;
class PerfWindowSamplerTest {
@Test
public void testBasicValues() {
PerfWindowSampler pws = new PerfWindowSampler();
pws.addDirect("a",() -> 1.0d, 1.0d);
pws.addDirect("b",()-> 3.0d, 3.0d);
pws.startWindow();
pws.stopWindow();
double value = pws.getCurrentWindowValue();
assertThat(value).isCloseTo(9.0, Offset.offset(0.002));
}
@Test
public void testDeltaValues() {
AtomicLong a1 = new AtomicLong(0);
DoubleSupplier ds1 = () -> (double) a1.get();
AtomicLong a2 = new AtomicLong(0);
DoubleSupplier ds2 = () -> (double) a2.get();
PerfWindowSampler pws = new PerfWindowSampler();
pws.addDeltaTime("a",ds1, 1.0d);
pws.addDeltaTime("b",ds2, 1.0d);
pws.startWindow(0L);
a1.set(3L);
a2.set(10L);
pws.stopWindow(1000L);
double value = pws.getCurrentWindowValue();
assertThat(value).isCloseTo(30.0,Offset.offset(0.001));
pws.startWindow(10000L);
a1.set(42); // 42-10=32
a2.set(42); // 42-1=41; 41+32=73
pws.stopWindow(11000L);
double value2 = pws.getCurrentWindowValue();
assertThat(value2).isCloseTo(1248.0,Offset.offset(0.001));
}
}