mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
findmax improvements
This commit is contained in:
@@ -20,6 +20,7 @@ import io.nosqlbench.scenarios.simframe.stabilization.StabilityDetector;
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.*;
|
import java.util.function.*;
|
||||||
|
|
||||||
@@ -39,7 +40,7 @@ public class SimFrameCapture implements SimFrameResults {
|
|||||||
private final StabilityDetector stabilizer;
|
private final StabilityDetector stabilizer;
|
||||||
|
|
||||||
public SimFrameCapture() {
|
public SimFrameCapture() {
|
||||||
stabilizer = new StabilityDetector(0.1,0.98,this::getPartialValue, 10,5);
|
stabilizer = new StabilityDetector(0.1,0.98,this::getPartialValue, this::toString, 10,5);
|
||||||
}
|
}
|
||||||
|
|
||||||
private double getPartialValue() {
|
private double getPartialValue() {
|
||||||
@@ -138,10 +139,11 @@ public class SimFrameCapture implements SimFrameResults {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public double getValue() {
|
public double getValue() {
|
||||||
if (allFrames.isEmpty()) {
|
FrameSampleSet lastFrame = allFrames.peekLast();
|
||||||
return Double.NaN;
|
if (lastFrame==null) {
|
||||||
|
System.out.println("no last frame");
|
||||||
}
|
}
|
||||||
return allFrames.getLast().value();
|
return lastFrame != null ? lastFrame.value() : Double.NaN;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@@ -152,7 +154,8 @@ public class SimFrameCapture implements SimFrameResults {
|
|||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
StringBuilder sb = new StringBuilder("PERF VALUE=").append(getValue()).append("\n");
|
StringBuilder sb = new StringBuilder("PERF VALUE=").append(getValue()).append("\n");
|
||||||
sb.append("windows:\n").append(allFrames.getLast().toString());
|
FrameSampleSet lastFrame = allFrames.peekLast();
|
||||||
|
sb.append("windows:\n").append(lastFrame==null ? "NONE" : lastFrame.toString());
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -175,6 +178,7 @@ public class SimFrameCapture implements SimFrameResults {
|
|||||||
throw new RuntimeException("cant start window twice in a row. Must close window first");
|
throw new RuntimeException("cant start window twice in a row. Must close window first");
|
||||||
}
|
}
|
||||||
restartWindow(now);
|
restartWindow(now);
|
||||||
|
// System.out.println("after (re)start):\n"+ frameCaptureSummary(activeFrame));
|
||||||
}
|
}
|
||||||
|
|
||||||
private String frameCaptureSummary(FrameSampleSet currentFrame) {
|
private String frameCaptureSummary(FrameSampleSet currentFrame) {
|
||||||
@@ -195,7 +199,7 @@ public class SimFrameCapture implements SimFrameResults {
|
|||||||
activeFrame.set(i, activeFrame.get(i).stop(now));
|
activeFrame.set(i, activeFrame.get(i).stop(now));
|
||||||
}
|
}
|
||||||
allFrames.add(activeFrame);
|
allFrames.add(activeFrame);
|
||||||
// System.out.println("after stop:\n"+ frameCaptureSummary(currentFrame));
|
// System.out.println("after stop:\n"+ frameCaptureSummary(activeFrame));
|
||||||
activeFrame = null;
|
activeFrame = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -212,7 +216,7 @@ public class SimFrameCapture implements SimFrameResults {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public static class FrameSamples extends ArrayList<FrameSampleSet> {
|
public static class FrameSamples extends LinkedList<FrameSampleSet> {
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ public class FindmaxRatchet extends SimFramePlanner<RatchetConfig, RatchetFrameP
|
|||||||
double newStepSize = best.params().step_size() * config.rate_scaledown();
|
double newStepSize = best.params().step_size() * config.rate_scaledown();
|
||||||
return new RatchetFrameParams(
|
return new RatchetFrameParams(
|
||||||
best.params().rate() + newStepSize, best.params().attempt(), newStepSize,
|
best.params().rate() + newStepSize, best.params().attempt(), newStepSize,
|
||||||
"SMALLER-STEP: " + newStepSize + " from fram " + best.index()
|
"SMALLER-STEP: " + newStepSize + " from frame " + best.index()
|
||||||
);
|
);
|
||||||
} else if (last.params().attempt() < config.max_attempts()) {
|
} else if (last.params().attempt() < config.max_attempts()) {
|
||||||
return new RatchetFrameParams(
|
return new RatchetFrameParams(
|
||||||
|
|||||||
@@ -38,8 +38,8 @@ public record RatchetConfig(
|
|||||||
params.maybeGet("sample_max").map(Integer::parseInt).orElse(10000),
|
params.maybeGet("sample_max").map(Integer::parseInt).orElse(10000),
|
||||||
params.maybeGet("sample_incr").map(Double::parseDouble).orElse(1.2d),
|
params.maybeGet("sample_incr").map(Double::parseDouble).orElse(1.2d),
|
||||||
params.maybeGet("rate_base").map(Double::parseDouble).orElse(5d),
|
params.maybeGet("rate_base").map(Double::parseDouble).orElse(5d),
|
||||||
params.maybeGet("rate_step").map(Double::parseDouble).orElse(1000d),
|
params.maybeGet("rate_step").map(Double::parseDouble).orElse(10000d),
|
||||||
params.maybeGet("rate_minstep").map(Double::parseDouble).orElse(1000d),
|
params.maybeGet("rate_minstep").map(Double::parseDouble).orElse(10000d),
|
||||||
params.maybeGet("rate_scaledown").map(Double::parseDouble).orElse(0.25),
|
params.maybeGet("rate_scaledown").map(Double::parseDouble).orElse(0.25),
|
||||||
params.maybeGet("max_attempts").map(Integer::parseInt).orElse(3)
|
params.maybeGet("max_attempts").map(Integer::parseInt).orElse(3)
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -70,10 +70,10 @@ public abstract class SimFramePlanner<C,P extends Record> {
|
|||||||
applyParams(frameParams,flywheel);
|
applyParams(frameParams,flywheel);
|
||||||
capture.startWindow();
|
capture.startWindow();
|
||||||
capture.awaitSteadyState();
|
capture.awaitSteadyState();
|
||||||
applyParams(frameParams,flywheel);
|
// applyParams(frameParams,flywheel);
|
||||||
capture.restartWindow();
|
// capture.restartWindow();
|
||||||
// controller.waitMillis(500);
|
//// controller.waitMillis(500);
|
||||||
capture.awaitSteadyState();
|
// capture.awaitSteadyState();
|
||||||
capture.stopWindow();
|
capture.stopWindow();
|
||||||
journal.record(frameParams, capture.last());
|
journal.record(frameParams, capture.last());
|
||||||
stdout.println(capture.last());
|
stdout.println(capture.last());
|
||||||
|
|||||||
@@ -23,12 +23,14 @@ import java.util.Arrays;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.OptionalDouble;
|
import java.util.OptionalDouble;
|
||||||
import java.util.function.DoubleSupplier;
|
import java.util.function.DoubleSupplier;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
public class StabilityDetector implements Runnable {
|
public class StabilityDetector implements Runnable {
|
||||||
private final static Logger logger = LogManager.getLogger(StabilityDetector.class);
|
private final static Logger logger = LogManager.getLogger(StabilityDetector.class);
|
||||||
private final double timeSliceSeconds;
|
private final double timeSliceSeconds;
|
||||||
private final double threshold;
|
private final double threshold;
|
||||||
private final DoubleSupplier source;
|
private final DoubleSupplier source;
|
||||||
|
private final Supplier<String> summary;
|
||||||
private StatBucket[] buckets;
|
private StatBucket[] buckets;
|
||||||
private int[] windows;
|
private int[] windows;
|
||||||
private volatile boolean running = true;
|
private volatile boolean running = true;
|
||||||
@@ -53,13 +55,20 @@ public class StabilityDetector implements Runnable {
|
|||||||
* The size of each window in the set of diminishing sizes. These contain the last N samples by size,
|
* The size of each window in the set of diminishing sizes. These contain the last N samples by size,
|
||||||
* respectively.
|
* respectively.
|
||||||
*/
|
*/
|
||||||
public StabilityDetector(double timeSliceSeconds, double minThreshold, DoubleSupplier source, int... windows) {
|
public StabilityDetector(
|
||||||
|
double timeSliceSeconds,
|
||||||
|
double minThreshold,
|
||||||
|
DoubleSupplier source,
|
||||||
|
Supplier<String> summary,
|
||||||
|
int... windows
|
||||||
|
) {
|
||||||
if (windows.length < 2) {
|
if (windows.length < 2) {
|
||||||
throw new RuntimeException("you must provide at least to summarization windows, ordered in decreasing size.");
|
throw new RuntimeException("you must provide at least to summarization windows, ordered in decreasing size.");
|
||||||
}
|
}
|
||||||
this.timeSliceSeconds = timeSliceSeconds;
|
this.timeSliceSeconds = timeSliceSeconds;
|
||||||
this.threshold = minThreshold;
|
this.threshold = minThreshold;
|
||||||
this.source = source;
|
this.source = source;
|
||||||
|
this.summary = summary;
|
||||||
this.windows = windows;
|
this.windows = windows;
|
||||||
for (int i = 0; i < windows.length - 1; i++) {
|
for (int i = 0; i < windows.length - 1; i++) {
|
||||||
if (windows[i] < windows[i + 1]) {
|
if (windows[i] < windows[i + 1]) {
|
||||||
@@ -122,6 +131,8 @@ public class StabilityDetector implements Runnable {
|
|||||||
for (int i = 0; i < stddev.length; i++) {
|
for (int i = 0; i < stddev.length; i++) {
|
||||||
System.out.printf("[%d]: %g ", windows[i], stddev[i]);
|
System.out.printf("[%d]: %g ", windows[i], stddev[i]);
|
||||||
}
|
}
|
||||||
|
System.out.println("stddevs: "+ Arrays.toString(stddev));
|
||||||
|
System.out.printf(this.summary.get());
|
||||||
System.out.println();
|
System.out.println();
|
||||||
}
|
}
|
||||||
return basis;
|
return basis;
|
||||||
@@ -133,6 +144,18 @@ public class StabilityDetector implements Runnable {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
|
try {
|
||||||
|
// System.out.println("Detector> OPEN");
|
||||||
|
updateAndAwait();
|
||||||
|
} catch (Exception e) {
|
||||||
|
// System.out.println("Detector> ERROR ERROR:" + e.toString());
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
} finally {
|
||||||
|
// System.out.println("Detector> CLOSE");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateAndAwait() {
|
||||||
int interval = (int) (this.timeSliceSeconds * 1000);
|
int interval = (int) (this.timeSliceSeconds * 1000);
|
||||||
startedAt = System.currentTimeMillis();
|
startedAt = System.currentTimeMillis();
|
||||||
reset();
|
reset();
|
||||||
@@ -145,10 +168,10 @@ public class StabilityDetector implements Runnable {
|
|||||||
try {
|
try {
|
||||||
Thread.sleep(delay);
|
Thread.sleep(delay);
|
||||||
} catch (InterruptedException ignored) {
|
} catch (InterruptedException ignored) {
|
||||||
|
System.out.println("Interrupted>");
|
||||||
}
|
}
|
||||||
delay = nextCheckAt - System.currentTimeMillis();
|
delay = nextCheckAt - System.currentTimeMillis();
|
||||||
}
|
}
|
||||||
|
|
||||||
double value = source.getAsDouble();
|
double value = source.getAsDouble();
|
||||||
apply(value);
|
apply(value);
|
||||||
double stabilityFactor = computeStability();
|
double stabilityFactor = computeStability();
|
||||||
@@ -164,7 +187,7 @@ public class StabilityDetector implements Runnable {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String levels8 = " ▁▂▃▄▅▆▇";
|
private static final String levels8 = " ▁▂▃▄▅▆▇";
|
||||||
public String stabilitySummary(double[] stddev) {
|
public String stabilitySummary(double[] stddev) {
|
||||||
StringBuilder sb = new StringBuilder("[");
|
StringBuilder sb = new StringBuilder("[");
|
||||||
double bias=(1.0d/16.0);
|
double bias=(1.0d/16.0);
|
||||||
|
|||||||
@@ -35,6 +35,7 @@ public final class StatBucket {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public StatBucket apply(double value) {
|
public StatBucket apply(double value) {
|
||||||
|
// System.out.println("stat->" + value + " bucket:" + toString());
|
||||||
double popped = ringbuf.push(value);
|
double popped = ringbuf.push(value);
|
||||||
if (ringbuf.count() == 1) {
|
if (ringbuf.count() == 1) {
|
||||||
mean = value;
|
mean = value;
|
||||||
|
|||||||
Reference in New Issue
Block a user