diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/clientload/ClientSystemMetricChecker.java b/engine-core/src/main/java/io/nosqlbench/engine/core/clientload/ClientSystemMetricChecker.java index 482fe4226..457ae0600 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/clientload/ClientSystemMetricChecker.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/clientload/ClientSystemMetricChecker.java @@ -35,7 +35,7 @@ public class ClientSystemMetricChecker extends NBBaseComponent { private List clientMetrics; public ClientSystemMetricChecker(NBComponent parent, NBLabels additionalLabels, int pollIntervalSeconds) { - super(parent,additionalLabels); + super(parent,additionalLabels.and("_type","client-metrics")); this.pollIntervalSeconds = pollIntervalSeconds; this.scheduler = Executors.newScheduledThreadPool(1); this.clientMetrics = new ArrayList<>(); diff --git a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java index f44515f5a..603d6de90 100644 --- a/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java +++ b/engine-core/src/main/java/io/nosqlbench/engine/core/lifecycle/activity/ActivityExecutor.java @@ -242,7 +242,7 @@ public class ActivityExecutor implements NBLabeledElement, ParameterMap.Listener * @param activityDef * the activityDef for this activity instance */ - private void adjustMotorCountToThreadParam(ActivityDef activityDef) { + private void adjustMotorCountToThreadParam(ActivityDef activityDef) { // TODO: Ensure that threads area allowed to complete their current op gracefully logger.trace(() -> ">-pre-adjust->" + getSlotStatus()); reduceActiveMotorCountDownToThreadParam(activityDef); diff --git a/engine-extensions/src/test/java/io/nosqlbench/optimizers/TestOptimoExperiments2.java b/engine-extensions/src/test/java/io/nosqlbench/optimizers/TestOptimoExperiments2.java index b88fb4b7b..25ea9bed4 100644 --- a/engine-extensions/src/test/java/io/nosqlbench/optimizers/TestOptimoExperiments2.java +++ b/engine-extensions/src/test/java/io/nosqlbench/optimizers/TestOptimoExperiments2.java @@ -18,10 +18,6 @@ package io.nosqlbench.optimizers; import io.nosqlbench.api.optimizers.MVLogger; import org.apache.commons.math4.legacy.analysis.MultivariateFunction; -//import org.apache.commons.math3.optim.*; -//import org.apache.commons.math3.optim.nonlinear.scalar.GoalType; -//import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction; -//import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer; import org.apache.commons.math4.legacy.exception.MathIllegalStateException; import org.apache.commons.math4.legacy.optim.*; import org.apache.commons.math4.legacy.optim.nonlinear.scalar.GoalType; diff --git a/nb-api/src/main/java/io/nosqlbench/api/engine/metrics/reporters/PromPushReporterComponent.java b/nb-api/src/main/java/io/nosqlbench/api/engine/metrics/reporters/PromPushReporterComponent.java index 744b22397..bef70751d 100644 --- a/nb-api/src/main/java/io/nosqlbench/api/engine/metrics/reporters/PromPushReporterComponent.java +++ b/nb-api/src/main/java/io/nosqlbench/api/engine/metrics/reporters/PromPushReporterComponent.java @@ -50,7 +50,7 @@ public class PromPushReporterComponent extends PeriodicTaskComponent { private String bearerToken; public PromPushReporterComponent(NBComponent parent, URI endpoint, long intervalMs, NBLabels nbLabels) { - super(parent,nbLabels,intervalMs,true); + super(parent,nbLabels.and("_type","prom-push"),intervalMs,true); this.uri = endpoint; this.keyfilePath = NBEnvironment.INSTANCE .interpolateWithTimestamp("$NBSTATEDIR/prompush/prompush_apikey", System.currentTimeMillis()) diff --git a/nb-api/src/main/java/io/nosqlbench/components/NBBaseComponent.java b/nb-api/src/main/java/io/nosqlbench/components/NBBaseComponent.java index 0abc57e7d..7fee03584 100644 --- a/nb-api/src/main/java/io/nosqlbench/components/NBBaseComponent.java +++ b/nb-api/src/main/java/io/nosqlbench/components/NBBaseComponent.java @@ -58,7 +58,10 @@ public class NBBaseComponent extends NBBaseComponentMetrics implements NBCompone for (NBComponent child : children) { logger.debug(() -> "attaching " + child.description() + " to parent " + this.description()); for (NBComponent extant : this.children) { - if (!child.getComponentOnlyLabels().isEmpty() && child.getComponentOnlyLabels().equals(extant.getComponentOnlyLabels())) { + NBLabels eachLabels = extant.getComponentOnlyLabels(); + NBLabels newLabels = child.getComponentOnlyLabels(); + + if (eachLabels!=null && newLabels!=null && !eachLabels.isEmpty() && !newLabels.isEmpty() && child.getComponentOnlyLabels().equals(extant.getComponentOnlyLabels())) { throw new RuntimeException("Adding second child under already-defined labels is not allowed:\n" + " extant: (" + extant.getClass().getSimpleName() + ") " + extant.description() + "\n" + " adding: (" + child.getClass().getSimpleName() + ") " + child.description()); diff --git a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimo/OptimoSearchSettings.java b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimo/OptimoSearchSettings.java index e03f4321d..09e15adab 100644 --- a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimo/OptimoSearchSettings.java +++ b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimo/OptimoSearchSettings.java @@ -25,7 +25,7 @@ public record OptimoSearchSettings( ) { public OptimoSearchSettings(ScenarioParams params, OptimoParamModel model) { this( - params.maybeGet("sample_time_ms").map(Long::parseLong).orElse(1000L), + params.maybeGet("sample_time_ms").map(Long::parseLong).orElse(5000L), model ); } diff --git a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimo/SC_optimo.java b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimo/SC_optimo.java index 859ba866a..fb9d3a931 100644 --- a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimo/SC_optimo.java +++ b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/optimo/SC_optimo.java @@ -25,6 +25,7 @@ import io.nosqlbench.components.events.SetThreads; import io.nosqlbench.engine.api.activityapi.core.Activity; import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.CycleRateSpec; import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec; +import io.nosqlbench.engine.core.lifecycle.scenario.context.ScenarioParams; import io.nosqlbench.engine.core.lifecycle.scenario.direct.SCBaseScenario; import io.nosqlbench.engine.core.lifecycle.scenario.execution.NBScenario; import io.nosqlbench.nb.annotations.Service; @@ -44,7 +45,13 @@ import org.apache.logging.log4j.Logger; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.LockSupport; + +/** + * // TODO: Add sanity checks for a valid run, which inform the users with too-high error rates, or similar, and + * // refuse to run without overrides. + */ @Service(value = NBScenario.class, selector = "optimo") public class SC_optimo extends SCBaseScenario { private final static Logger logger = LogManager.getLogger(SC_findmax.class); @@ -59,36 +66,64 @@ public class SC_optimo extends SCBaseScenario { String workload = params.getOrDefault("workload", "default_workload"); CycleRateSpec ratespec = new CycleRateSpec(100.0, 1.05); - Map activityParams = new HashMap<>(Map.of( - "cycles", String.valueOf(Long.MAX_VALUE), - "threads", params.getOrDefault("threads", "1"), - "driver", "diag", - "rate", String.valueOf(ratespec.opsPerSec), - "dryrun", "op" - )); - if (params.containsKey("workload")) { - activityParams.put("workload", params.get("workload")); - } else if (params.containsKey("op")) { - activityParams.put("op", params.get("op")); - } else { - activityParams.put("op", "log: level=info"); - logger.warn("You provided neither a workload nor an op, so assuming diagnostic mode."); - } +// Map defaultActivityParams = new HashMap<>(Map.of( +// "cycles", String.valueOf(Long.MAX_VALUE), +// "threads", params.getOrDefault("threads", "1"), +// "driver", "diag", +// "rate", String.valueOf(ratespec.opsPerSec), +// "dryrun", "op" +// )); +// ScenarioParams customParams = params.withDefaults(defaultActivityParams); +// +// if (params.containsKey("workload")) { +// activityParams.put("workload", params.get("workload")); +// } else if (params.containsKey("op")) { +// activityParams.put("op", params.get("op")); +// } else { +// activityParams.put("op", "log: level=info"); +// logger.warn("You provided neither a workload nor an op, so assuming diagnostic mode."); +// } + + var activityParams = params.withOverrides( + new HashMap() {{ + put("cycles", String.valueOf(Long.MAX_VALUE)); + put("rate", "10"); + put("threads", "1"); + put("tags", "block:testann"); + put("errors", "count,retry,warn"); + put("maxtries", "2"); + }} + ); + activityParams.remove("main_class"); Activity flywheel = controller.start(activityParams); + + NBMetricTimer result_success_timer = flywheel.find().timer("name:result_success"); + for (int i = 0; i < 1000; i++) { + if (result_success_timer.getCount()>0) { + System.out.println("saw traffic at loop " + i); + break; + } + LockSupport.parkNanos(10_000_000); + } + if (result_success_timer.getCount()==0) { + throw new RuntimeException("Activity was not processing cycles after waiting 10seconds"); + } + + // await flywheel actually spinning, or timeout with error SimFrameCapture capture = this.perfValueMeasures(flywheel); SimFrameJournal journal = new SimFrameJournal<>(); OptimoParamModel model = new OptimoParamModel(); - model.add("rate",100,1000,10_000_000, + model.add("rate", 1, 10, 1000, rate -> flywheel.onEvent(ParamChange.of(new CycleRateSpec(rate, 1.1d, SimRateSpec.Verb.restart))) ); - model.add("threads",1,10,10_000, + model.add("threads", 10, 20, 100, threads -> flywheel.onEvent(ParamChange.of(new SetThreads((int) threads))) ); - OptimoSearchSettings optimoSearchParams = new OptimoSearchSettings(params,model); - SimFrameFunction frameFunction = new OptimoFrameFunction(controller,optimoSearchParams,flywheel,capture,journal); + OptimoSearchSettings optimoSearchParams = new OptimoSearchSettings(params, model); + SimFrameFunction frameFunction = new OptimoFrameFunction(controller, optimoSearchParams, flywheel, capture, journal); List od = List.of( @@ -160,7 +195,7 @@ public class SC_optimo extends SCBaseScenario { result = mo.optimize(od.toArray(new OptimizationData[od.size()])); } catch (MathIllegalStateException missed) { if (missed.getMessage().contains("trust region step has failed to reduce Q")) { - logger.warn(missed.getMessage()+", so returning current result."); + logger.warn(missed.getMessage() + ", so returning current result."); result = new PointValuePair(journal.last().params().paramValues(), journal.last().value()); } else { throw missed; @@ -191,24 +226,34 @@ public class SC_optimo extends SCBaseScenario { sampler.addRemix("achieved_success_ratio", vars -> { // exponentially penalize results which do not attain 100% successful op rate + if (vars.get("achieved_oprate")==0.0d) { + return 0d; + } double basis = Math.min(1.0d, vars.get("achieved_ok_oprate") / vars.get("achieved_oprate")); - return Math.pow(basis, 2); + return basis; +// return Math.pow(basis, 2); }); sampler.addRemix("achieved_target_ratio", (vars) -> { // exponentially penalize results which do not attain 100% target rate double basis = Math.min(1.0d, vars.get("achieved_ok_oprate") / vars.get("target_rate")); - return Math.pow(basis, 2); + return basis; +// return Math.pow(basis, 2); }); sampler.addRemix("retries_p99", (vars) -> { double triesP99 = tries_histo.getDeltaSnapshot(90).get99thPercentile(); - return 1/triesP99; + if (Double.isNaN(triesP99)||Double.isInfinite(triesP99)||triesP99==0.0d) { + logger.warn("invalid value for retries_p99, skipping as identity for now"); + return 1.0d; + } + return 1 / triesP99; }); sampler.addDirect("latency_cutoff_50", () -> { double latencyP99 = latency_histo.getDeltaSnapshot(90).getP99ms(); double v = StatFunctions.sigmoidE4LowPass(latencyP99, 50); // System.out.println("v:"+v+" p99ms:" + latencyP99); - return v; - },1.0d); + return 1.0d; +// return v; + }, 1.0d); return sampler; } } diff --git a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/stabilization/StabilityDetector.java b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/stabilization/StabilityDetector.java index 6d80b3c7e..12f4b4e20 100644 --- a/nbr/src/main/java/io/nosqlbench/scenarios/simframe/stabilization/StabilityDetector.java +++ b/nbr/src/main/java/io/nosqlbench/scenarios/simframe/stabilization/StabilityDetector.java @@ -68,7 +68,7 @@ public class StabilityDetector implements Runnable { } private void reset() { - detectionTime=-1L; + detectionTime = -1L; this.buckets = new StatBucket[windows.length]; for (int i = 0; i < windows.length; i++) { buckets[i] = new StatBucket(windows[i]); @@ -108,6 +108,8 @@ public class StabilityDetector implements Runnable { basis *= reductionFactor; } + // TODO: investigate why we get NaN sometimes and what it means for stability checks + // TODO: turn this into a one line summary with some cool unicode characters // double time = ((double)(nextCheckAt - startedAt))/1000d; // System.out.printf("% 4.1fS STABILITY %g :", time, basis); // for (int i = 0; i < stddev.length; i++) { @@ -144,19 +146,19 @@ public class StabilityDetector implements Runnable { double stabilityFactor = computeStability(); if (stabilityFactor > threshold) { - detectionTime = ((double)(nextCheckAt - startedAt))/1000d; + detectionTime = ((double) (nextCheckAt - startedAt)) / 1000d; return; } - nextCheckAt+=interval; + nextCheckAt += interval; } } @Override public String toString() { - if (detectionTime>0L) { - return String.format("stability converged in % 4.2fS",detectionTime); + if (detectionTime > 0L) { + return String.format("stability converged in % 4.2fS", detectionTime); } else { - return String.format("awaiting stability for % 4.2fS",(((double)(nextCheckAt - startedAt))/1000d)); + return String.format("awaiting stability for % 4.2fS", (((double) (nextCheckAt - startedAt)) / 1000d)); } } }