these changes are not permanent

This commit is contained in:
Jonathan Shook 2023-11-02 17:35:27 -05:00
parent 082c72afe0
commit 1e32973e47
8 changed files with 86 additions and 40 deletions

View File

@ -35,7 +35,7 @@ public class ClientSystemMetricChecker extends NBBaseComponent {
private List<ClientMetric> clientMetrics;
public ClientSystemMetricChecker(NBComponent parent, NBLabels additionalLabels, int pollIntervalSeconds) {
super(parent,additionalLabels);
super(parent,additionalLabels.and("_type","client-metrics"));
this.pollIntervalSeconds = pollIntervalSeconds;
this.scheduler = Executors.newScheduledThreadPool(1);
this.clientMetrics = new ArrayList<>();

View File

@ -242,7 +242,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener
* @param activityDef
* the activityDef for this activity instance
*/
private void adjustMotorCountToThreadParam(ActivityDef activityDef) {
private void adjustMotorCountToThreadParam(ActivityDef activityDef) { // TODO: Ensure that threads area allowed to complete their current op gracefully
logger.trace(() -> ">-pre-adjust->" + getSlotStatus());
reduceActiveMotorCountDownToThreadParam(activityDef);

View File

@ -18,10 +18,6 @@ package io.nosqlbench.optimizers;
import io.nosqlbench.api.optimizers.MVLogger;
import org.apache.commons.math4.legacy.analysis.MultivariateFunction;
//import org.apache.commons.math3.optim.*;
//import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
//import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction;
//import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer;
import org.apache.commons.math4.legacy.exception.MathIllegalStateException;
import org.apache.commons.math4.legacy.optim.*;
import org.apache.commons.math4.legacy.optim.nonlinear.scalar.GoalType;

View File

@ -50,7 +50,7 @@ public class PromPushReporterComponent extends PeriodicTaskComponent {
private String bearerToken;
public PromPushReporterComponent(NBComponent parent, URI endpoint, long intervalMs, NBLabels nbLabels) {
super(parent,nbLabels,intervalMs,true);
super(parent,nbLabels.and("_type","prom-push"),intervalMs,true);
this.uri = endpoint;
this.keyfilePath = NBEnvironment.INSTANCE
.interpolateWithTimestamp("$NBSTATEDIR/prompush/prompush_apikey", System.currentTimeMillis())

View File

@ -58,7 +58,10 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone
for (NBComponent child : children) {
logger.debug(() -> "attaching " + child.description() + " to parent " + this.description());
for (NBComponent extant : this.children) {
if (!child.getComponentOnlyLabels().isEmpty() && child.getComponentOnlyLabels().equals(extant.getComponentOnlyLabels())) {
NBLabels eachLabels = extant.getComponentOnlyLabels();
NBLabels newLabels = child.getComponentOnlyLabels();
if (eachLabels!=null && newLabels!=null && !eachLabels.isEmpty() && !newLabels.isEmpty() && child.getComponentOnlyLabels().equals(extant.getComponentOnlyLabels())) {
throw new RuntimeException("Adding second child under already-defined labels is not allowed:\n" +
" extant: (" + extant.getClass().getSimpleName() + ") " + extant.description() + "\n" +
" adding: (" + child.getClass().getSimpleName() + ") " + child.description());

View File

@ -25,7 +25,7 @@ public record OptimoSearchSettings(
) {
public OptimoSearchSettings(ScenarioParams params, OptimoParamModel model) {
this(
params.maybeGet("sample_time_ms").map(Long::parseLong).orElse(1000L),
params.maybeGet("sample_time_ms").map(Long::parseLong).orElse(5000L),
model
);
}

View File

@ -25,6 +25,7 @@ import io.nosqlbench.components.events.SetThreads;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioParams;
import io.nosqlbench.engine.core.lifecycle.scenario.direct.SCBaseScenario;
import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBScenario;
import io.nosqlbench.nb.annotations.Service;
@ -44,7 +45,13 @@ import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.LockSupport;
/**
* // TODO: Add sanity checks for a valid run, which inform the users with too-high error rates, or similar, and
* // refuse to run without overrides.
*/
@Service(value = NBScenario.class, selector = "optimo")
public class SC_optimo extends SCBaseScenario {
private final static Logger logger = LogManager.getLogger(SC_findmax.class);
@ -59,36 +66,64 @@ public class SC_optimo extends SCBaseScenario {
String workload = params.getOrDefault("workload", "default_workload");
CycleRateSpec ratespec = new CycleRateSpec(100.0, 1.05);
Map<String, String> activityParams = new HashMap<>(Map.of(
"cycles", String.valueOf(Long.MAX_VALUE),
"threads", params.getOrDefault("threads", "1"),
"driver", "diag",
"rate", String.valueOf(ratespec.opsPerSec),
"dryrun", "op"
));
if (params.containsKey("workload")) {
activityParams.put("workload", params.get("workload"));
} else if (params.containsKey("op")) {
activityParams.put("op", params.get("op"));
} else {
activityParams.put("op", "log: level=info");
logger.warn("You provided neither a workload nor an op, so assuming diagnostic mode.");
}
// Map<String, String> defaultActivityParams = new HashMap<>(Map.of(
// "cycles", String.valueOf(Long.MAX_VALUE),
// "threads", params.getOrDefault("threads", "1"),
// "driver", "diag",
// "rate", String.valueOf(ratespec.opsPerSec),
// "dryrun", "op"
// ));
// ScenarioParams customParams = params.withDefaults(defaultActivityParams);
//
// if (params.containsKey("workload")) {
// activityParams.put("workload", params.get("workload"));
// } else if (params.containsKey("op")) {
// activityParams.put("op", params.get("op"));
// } else {
// activityParams.put("op", "log: level=info");
// logger.warn("You provided neither a workload nor an op, so assuming diagnostic mode.");
// }
var activityParams = params.withOverrides(
new HashMap<String, String>() {{
put("cycles", String.valueOf(Long.MAX_VALUE));
put("rate", "10");
put("threads", "1");
put("tags", "block:testann");
put("errors", "count,retry,warn");
put("maxtries", "2");
}}
);
activityParams.remove("main_class");
Activity flywheel = controller.start(activityParams);
NBMetricTimer result_success_timer = flywheel.find().timer("name:result_success");
for (int i = 0; i < 1000; i++) {
if (result_success_timer.getCount()>0) {
System.out.println("saw traffic at loop " + i);
break;
}
LockSupport.parkNanos(10_000_000);
}
if (result_success_timer.getCount()==0) {
throw new RuntimeException("Activity was not processing cycles after waiting 10seconds");
}
// await flywheel actually spinning, or timeout with error
SimFrameCapture capture = this.perfValueMeasures(flywheel);
SimFrameJournal<OptimoFrameParams> journal = new SimFrameJournal<>();
OptimoParamModel model = new OptimoParamModel();
model.add("rate",100,1000,10_000_000,
model.add("rate", 1, 10, 1000,
rate -> flywheel.onEvent(ParamChange.of(new CycleRateSpec(rate, 1.1d, SimRateSpec.Verb.restart)))
);
model.add("threads",1,10,10_000,
model.add("threads", 10, 20, 100,
threads -> flywheel.onEvent(ParamChange.of(new SetThreads((int) threads)))
);
OptimoSearchSettings optimoSearchParams = new OptimoSearchSettings(params,model);
SimFrameFunction frameFunction = new OptimoFrameFunction(controller,optimoSearchParams,flywheel,capture,journal);
OptimoSearchSettings optimoSearchParams = new OptimoSearchSettings(params, model);
SimFrameFunction frameFunction = new OptimoFrameFunction(controller, optimoSearchParams, flywheel, capture, journal);
List<OptimizationData> od = List.of(
@ -160,7 +195,7 @@ public class SC_optimo extends SCBaseScenario {
result = mo.optimize(od.toArray(new OptimizationData[od.size()]));
} catch (MathIllegalStateException missed) {
if (missed.getMessage().contains("trust region step has failed to reduce Q")) {
logger.warn(missed.getMessage()+", so returning current result.");
logger.warn(missed.getMessage() + ", so returning current result.");
result = new PointValuePair(journal.last().params().paramValues(), journal.last().value());
} else {
throw missed;
@ -191,24 +226,34 @@ public class SC_optimo extends SCBaseScenario {
sampler.addRemix("achieved_success_ratio", vars -> {
// exponentially penalize results which do not attain 100% successful op rate
if (vars.get("achieved_oprate")==0.0d) {
return 0d;
}
double basis = Math.min(1.0d, vars.get("achieved_ok_oprate") / vars.get("achieved_oprate"));
return Math.pow(basis, 2);
return basis;
// return Math.pow(basis, 2);
});
sampler.addRemix("achieved_target_ratio", (vars) -> {
// exponentially penalize results which do not attain 100% target rate
double basis = Math.min(1.0d, vars.get("achieved_ok_oprate") / vars.get("target_rate"));
return Math.pow(basis, 2);
return basis;
// return Math.pow(basis, 2);
});
sampler.addRemix("retries_p99", (vars) -> {
double triesP99 = tries_histo.getDeltaSnapshot(90).get99thPercentile();
return 1/triesP99;
if (Double.isNaN(triesP99)||Double.isInfinite(triesP99)||triesP99==0.0d) {
logger.warn("invalid value for retries_p99, skipping as identity for now");
return 1.0d;
}
return 1 / triesP99;
});
sampler.addDirect("latency_cutoff_50", () -> {
double latencyP99 = latency_histo.getDeltaSnapshot(90).getP99ms();
double v = StatFunctions.sigmoidE4LowPass(latencyP99, 50);
// System.out.println("v:"+v+" p99ms:" + latencyP99);
return v;
},1.0d);
return 1.0d;
// return v;
}, 1.0d);
return sampler;
}
}

View File

@ -68,7 +68,7 @@ public class StabilityDetector implements Runnable {
}
private void reset() {
detectionTime=-1L;
detectionTime = -1L;
this.buckets = new StatBucket[windows.length];
for (int i = 0; i < windows.length; i++) {
buckets[i] = new StatBucket(windows[i]);
@ -108,6 +108,8 @@ public class StabilityDetector implements Runnable {
basis *= reductionFactor;
}
// TODO: investigate why we get NaN sometimes and what it means for stability checks
// TODO: turn this into a one line summary with some cool unicode characters
// double time = ((double)(nextCheckAt - startedAt))/1000d;
// System.out.printf("% 4.1fS STABILITY %g :", time, basis);
// for (int i = 0; i < stddev.length; i++) {
@ -144,19 +146,19 @@ public class StabilityDetector implements Runnable {
double stabilityFactor = computeStability();
if (stabilityFactor > threshold) {
detectionTime = ((double)(nextCheckAt - startedAt))/1000d;
detectionTime = ((double) (nextCheckAt - startedAt)) / 1000d;
return;
}
nextCheckAt+=interval;
nextCheckAt += interval;
}
}
@Override
public String toString() {
if (detectionTime>0L) {
return String.format("stability converged in % 4.2fS",detectionTime);
if (detectionTime > 0L) {
return String.format("stability converged in % 4.2fS", detectionTime);
} else {
return String.format("awaiting stability for % 4.2fS",(((double)(nextCheckAt - startedAt))/1000d));
return String.format("awaiting stability for % 4.2fS", (((double) (nextCheckAt - startedAt)) / 1000d));
}
}
}