improving optimo sampler and rates

This commit is contained in:
Jonathan Shook 2023-10-10 18:24:19 -05:00
parent e64de7476e
commit 9978d50d7e
8 changed files with 337 additions and 108 deletions

View File

@ -27,6 +27,7 @@ import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
@ -69,7 +70,8 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
private long refillIntervalNanos = 1_000_000_0;
private int maxActivePool, burstPoolSize, maxOverActivePool, ticksPerOp;
private SimRateSpec spec;
private long blocks;
private LongAdder blocks = new LongAdder();
private final ReentrantLock fillerLock = new ReentrantLock(false);
@ -246,7 +248,7 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
}
public long block() {
this.blocks++;
this.blocks.increment();
try {
this.activePool.acquire(ticksPerOp);
} catch (InterruptedException e) {
@ -325,7 +327,7 @@ public class SimRate extends NBBaseComponent implements RateLimiter, Thread.Unca
(double) this.activePool.availablePermits() / this.maxActivePool * 100.0,
(double) this.activePool.availablePermits() / this.maxOverActivePool * 100.0,
this.waitingPool.get(),
this.blocks,
this.blocks.sum(),
this.fillerLock.isLocked() ? "LOCKED" : "UNLOCKED", spec.ticksPerOp()
);

View File

@ -0,0 +1,80 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.sandbox;
import io.nosqlbench.api.config.standard.TestComponent;
import io.nosqlbench.components.NBComponent;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRate;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.TimeUnit;
@State(Scope.Group)
@Measurement(time = 10,timeUnit = TimeUnit.SECONDS)
public class SimRateSanityTest {
private final NBComponent parent = new TestComponent("rltest","rltest");
public static void main(String[] args) {
Options jmhOptions = new OptionsBuilder()
// .include("simrate[0-9]+")
// .include("simrate(1|24|240)")
.forks(1)
.warmupBatchSize(1)
.warmupIterations(0)
.build();
try {
new Runner(jmhOptions).run();
} catch (RunnerException e) {
throw new RuntimeException(e);
}
}
private SimRate rl;
@Setup
public void setup() {
SimRateSpec spec = new SimRateSpec(250,1.01);
rl = new SimRate(parent,spec);
}
@Benchmark
@Group("at250ops1thread")
@GroupThreads(1)
@BenchmarkMode(Mode.Throughput)
@Disabled
public void at250ops1thread() {
rl.block();
}
@Benchmark
@Group("at250ops240threads")
@GroupThreads(240)
@BenchmarkMode(Mode.Throughput)
@Disabled
public void at250ops240threads() {
rl.block();
}
}

View File

@ -204,7 +204,7 @@ public class GrafanaClient {
HttpRequest request = rqb.build();
response = client.send(request, HttpResponse.BodyHandlers.ofString());
} catch (Exception e) {
if (e.getMessage().contains("WWW-Authenticate header missing")) {
if (e.getMessage()!=null && e.getMessage().contains("WWW-Authenticate header missing")) {
throw new RuntimeException("Java HttpClient was not authorized, and it saw no WWW-Authenticate header" +
" in the response, so this is probably Grafana telling you that the auth scheme failed. Normally " +
"this error would be thrown by Java HttpClient:" + e.getMessage());

View File

@ -0,0 +1,134 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.optimizers;
import io.nosqlbench.api.optimizers.MVLogger;
import org.apache.commons.math3.analysis.MultivariateFunction;
import org.apache.commons.math3.exception.MathIllegalStateException;
import org.apache.commons.math3.optim.*;
import org.apache.commons.math3.optim.nonlinear.scalar.GoalType;
import org.apache.commons.math3.optim.nonlinear.scalar.ObjectiveFunction;
import org.apache.commons.math3.optim.nonlinear.scalar.noderiv.BOBYQAOptimizer;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
/**
* Consider JSAT
* <p>
* https://en.wikipedia.org/wiki/Coordinate_descent
* <p>
* http://firsttimeprogrammer.blogspot.com/2014/09/multivariable-gradient-descent.html
* <p>
* https://towardsdatascience.com/machine-learning-bit-by-bit-multivariate-gradient-descent-e198fdd0df85
* <p>
* https://pdfs.semanticscholar.org/d142/3994d7b4462994925663959721130755b275.pdf
* <p>
* file:///tmp/bfgs-example.pdf
* <p>
* https://github.com/vinhkhuc/lbfgs4j
* <p>
* https://github.com/EdwardRaff/JSAT/wiki/Algorithms
* <p>
* http://www.optimization-online.org/DB_FILE/2010/05/2616.pdf
* <p>
* https://github.com/dpressel/sgdtk
*/
public class TestOptimoExperiments2 {
private final static Logger logger = LogManager.getLogger(TestOptimoExperiments2.class);
@Test
public void testBrokenParams() {
MultivariateFunction m = new LinearOneParam();
MVLogger fLogger = new MVLogger(m);
// MultivariateDifferentiableFunction mvdf =FunctionUtils.
// MultivariateVectorFunction mvf = new GradientFunction(mvdf);
// ObjectiveFunctionGradient ofg = new ObjectiveFunctionGradient(mvf);
SimpleBounds bounds = new SimpleBounds(
new double[]{1.0d, 1.0d},
new double[]{10000, 1000});
List<OptimizationData> od = List.of(
new ObjectiveFunction(m),
GoalType.MAXIMIZE,
new InitialGuess(new double[]{2500, 250}),
new MaxEval(1000)
, bounds
);
BOBYQAOptimizer mo = new BOBYQAOptimizer(
5,
1000.0,
1.0
);
PointValuePair result = null;
try {
result = mo.optimize(od.toArray(new OptimizationData[0]));
} catch (MathIllegalStateException missed) {
if (missed.getMessage().contains("trust region step has failed to reduce Q")) {
logger.warn(missed.getMessage()+", so returning current result.");
result = new PointValuePair(fLogger.getLastEntry().params(), fLogger.getLastEntry().value());
} else {
throw missed;
}
}
logger.debug("point:" + Arrays.toString(result.getPoint()) +" value=" + m.value(result.getPoint()));
}
private static class LinearOneParam implements MultivariateFunction {
private int iter;
private final Random r = new Random(System.nanoTime());
@Override
public double value(double[] doubles) {
iter++;
double value = doubles[0] + (System.currentTimeMillis() / 1000000000000d);
System.out.format("params %s val=%.3f\n\n", Arrays.stream(doubles).mapToObj(String::valueOf).collect(Collectors.joining(",")), value);
return value;
// double value = r.nextDouble()*10.0;
// System.out.format("i:%d NOISE=%.3f GUESS=%s\n",iter, value, Arrays.toString(doubles));
//
// double product = 1.0d;
// for (double aDouble : doubles) {
// double component = 100.0 - Math.abs(aDouble-100);
// product+=component;
// System.out.print(" +" + component);
// }
// value += product;
// System.out.format(" val=%.3f\n\n",value);
// return value;
}
}
// private final static DoubleBinaryOperator f1 = (a,b) -> {
// return 5+Math.pow(a,2.0d)+Math.pow(b,2.0d);
// };
}

View File

@ -24,34 +24,38 @@ import java.util.List;
public class MVLogger implements MultivariateFunction {
private final MultivariateFunction function;
List<List<Double>> log = new ArrayList<>();
List<Entry> entries = new ArrayList<>();
public MVLogger(MultivariateFunction function) {
this.function = function;
}
@Override
public double value(double[] doubles) {
ArrayList<Double> params = new ArrayList<>(doubles.length);
log.add(params);
return function.value(doubles);
public double value(double[] params) {
double value = function.value(params);
entries.add(new Entry(params,value));
return value;
}
public List<List<Double>> getLogList() {
return log;
public List<Entry> getLogList() {
return entries;
}
public List<Entry> getLog() {
return entries;
}
public double[][] getLogArray() {
double[][] ary = new double[log.size()][];
for (int row = 0; row < log.size(); row++) {
List<Double> columns = log.get(row);
double[] rowary = new double[columns.size()];
ary[row]=rowary;
for (int col = 0; col < log.get(row).size(); col++) {
rowary[col]=columns.get(col);
}
double[][] ary = new double[entries.size()][];
for (int row = 0; row < entries.size(); row++) {
Entry entry = entries.get(row);
ary[row]=entry.params();
}
return ary;
}
public Entry getLastEntry() {
return entries.getLast();
}
public static record Entry(double[] params, double value) {};
}

View File

@ -24,7 +24,7 @@ import java.util.function.DoubleSupplier;
* This is a helper class that makes it easy to bundle up a combination of measurable
* factors and get a windowed sample from them. To use it, add your named data sources
* with their coefficients, and optionally a callback which resets the measurement
* buffers for the next time. When you call {@link #getCurrentWindowValue()}, all callbacks
* buffers for the next time. When you call {@link #getValue()}, all callbacks
* are used after the value computation is complete.
*
* <P>This is NOT thread safe!</P>
@ -32,19 +32,8 @@ import java.util.function.DoubleSupplier;
public class PerfWindowSampler {
private final List<Criterion> criteria = new ArrayList<>();
private boolean openWindow = false;
private final static int STARTS = 0;
private final static int ENDS = 1;
private final static int WEIGHTED = 2;
private final static int START_TIME = 3;
private final static int END_TIME = 4;
private final static int ARYSIZE = END_TIME+1;
/**
* window, measure, START,STOP,WEIGHTED
*/
private double[][][] data;
private int window = -1;
private final WindowSamples windows = new WindowSamples();
private WindowSample window;
void addDirect(String name, DoubleSupplier supplier, double weight, Runnable callback) {
@ -65,90 +54,40 @@ public class PerfWindowSampler {
});
}
double getCurrentWindowValue() {
if (openWindow) {
throw new RuntimeException("invalid access to checkpoint value on open window.");
}
double product = 1.0d;
if (data==null) {
double getValue() {
if (windows.size()==0) {
return Double.NaN;
}
double[][] values = data[window];
for (int i = 0; i < criteria.size(); i++) {
product *= values[i][WEIGHTED];
}
return product;
return windows.getLast().value();
}
private double valueOf(int measuredItem) {
double[] vals = data[window][measuredItem];
if (criteria.get(measuredItem).delta) {
double duration = (vals[END_TIME] - vals[START_TIME])/1000D;
double increment = vals[ENDS] - vals[STARTS];
return increment / duration;
} else {
return vals[ENDS];
}
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder("PERF " + (openWindow ? "OPENWINDOW! " : "" ) + "sampler value =").append(getCurrentWindowValue()).append("\n");
for (int i = 0; i < criteria.size(); i++) {
Criterion criterion = criteria.get(i);
sb.append("->").append(criterion.name).append(" last=").append(valueOf(i)).append("\n");
}
StringBuilder sb = new StringBuilder("PERF VALUE=").append(getValue()).append("\n");
sb.append("windows:\n"+windows.getLast().toString());
return sb.toString();
}
public void startWindow() {
startWindow(System.currentTimeMillis());
}
public void startWindow(long now) {
openWindow=true;
window++;
if (this.data == null) {
this.data = new double[1][criteria.size()][ARYSIZE];
}
if (this.window >=data.length) {
double[][][] newary = new double[data.length<<1][criteria.size()][ARYSIZE];
System.arraycopy(data,0,newary,0,data.length);
this.data = newary;
}
for (int i = 0; i < criteria.size(); i++) {
data[window][i][START_TIME] = now;
Criterion criterion = criteria.get(i);
if (criterion.delta) {
data[window][i][STARTS] = criterion.supplier.getAsDouble();
} else {
data[window][i][STARTS] = Double.NaN;
}
criterion.callback.run();
}
for (Criterion criterion : criteria) {
criterion.callback.run();
if (window!=null) {
throw new RuntimeException("cant start window twice in a row. Must close window first");
}
List<ParamSample> samples = criteria.stream().map(c -> ParamSample.init(c).start(now)).toList();
this.window = new WindowSample(samples);
}
public void stopWindow() {
stopWindow(System.currentTimeMillis());
}
public void stopWindow(long now) {
for (int i = 0; i < criteria.size(); i++) {
data[window][i][END_TIME] = now;
Criterion criterion = criteria.get(i);
double endmark = criterion.supplier.getAsDouble();
data[window][i][ENDS] = endmark;
double sample = valueOf(i);
data[window][i][WEIGHTED] = sample* criterion.weight;
for (int i = 0; i < window.size(); i++) {
window.set(i,window.get(i).stop(now));
}
openWindow=false;
windows.add(window);
window=null;
}
public static record Criterion(
@ -158,4 +97,55 @@ public class PerfWindowSampler {
Runnable callback,
boolean delta
) { }
public static class WindowSamples extends ArrayList<WindowSample> {}
public static class WindowSample extends ArrayList<ParamSample> {
public WindowSample(List<ParamSample> samples) {
super(samples);
}
public double value() {
return stream().mapToDouble(ParamSample::weightedValue).sum();
}
}
public static record ParamSample(Criterion criterion, long startAt, long endAt, double startval, double endval) {
public double weightedValue() {
return rawValue() * criterion().weight;
}
private double rawValue() {
if (criterion.delta()) {
return endval - startval;
}
return endval;
}
private double rate() {
return rawValue()/seconds();
}
private double seconds() {
return ((double)(endAt-startAt)) / 1000d;
}
public static ParamSample init(Criterion criterion) {
return new ParamSample(criterion, 0,0,Double.NaN, Double.NaN);
}
public ParamSample start(long startTime) {
criterion.callback.run();
double v1 = criterion.supplier.getAsDouble();
return new ParamSample(criterion,startTime,0L, v1, Double.NaN);
}
public ParamSample stop(long stopTime) {
double v2 = criterion.supplier.getAsDouble();
return new ParamSample(criterion,startAt,stopTime,startval, v2);
}
@Override
public String toString() {
return "sample[" + criterion.name() + "] "
+ ((Double.isNaN(endval)) ? " incomplete" : "dT:" + seconds() + " dV:" + rawValue() + " rate:" + rate() + " v1:" + startval + " v2:" + endval);
}
}
}

View File

@ -27,7 +27,6 @@ import io.nosqlbench.components.NBComponent;
import io.nosqlbench.components.events.ParamChange;
import io.nosqlbench.engine.api.activityapi.core.Activity;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.CycleRateSpec;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRate;
import io.nosqlbench.engine.api.activityapi.ratelimits.simrate.SimRateSpec;
import io.nosqlbench.engine.core.lifecycle.scenario.direct.SCBaseScenario;
import org.apache.logging.log4j.LogManager;
@ -72,7 +71,7 @@ public class SC_optimo extends SCBaseScenario {
BobyqaOptimizerInstance bobby = create().bobyqaOptimizer();
bobby.param("rate", 1.0d, 10000.d);
bobby.param("threads", 1.0d, 1000.0d);
bobby.param("noise", 100d, 200.0d);
// bobby.param("noise", 100d, 200.0d);
bobby.setInitialRadius(1000000.0).setStoppingRadius(0.001).setMaxEval(1000);
Activity flywheel = controller.start(activityParams);
@ -86,9 +85,14 @@ public class SC_optimo extends SCBaseScenario {
* <P>The parameter values will be passed in as an array, pair-wise with the param calls above.</P>
*/
flywheel.onEvent(new ParamChange<>(new CycleRateSpec(5.0, 1.1d, SimRateSpec.Verb.restart)));
PerfWindowSampler sampler = new PerfWindowSampler();
NBMetricTimer result_success_timer = flywheel.find().timer("name:result_success");
System.out.println("c1:" + result_success_timer.getCount());
sampler.addDeltaTime("achieved_rate", result_success_timer::getCount, 1000.0);
System.out.println("c2:" + result_success_timer.getCount());
stdout.println(" RATE>>> " + flywheel.getCycleLimiter().toString());
final DeltaSnapshotReader snapshotter = result_success_timer.getDeltaReader();
AtomicReference<ConvenientSnapshot> snapshot = new AtomicReference<>(snapshotter.getDeltaSnapshot());
// ValidAtOrBelow below15000 = ValidAtOrBelow.max(15000);
@ -98,28 +102,43 @@ public class SC_optimo extends SCBaseScenario {
// -1.0,
// () -> snapshot.set(snapshotter.getDeltaSnapshot())
// );
sampler.startWindow();
ToDoubleFunction<double[]> f = new ToDoubleFunction<double[]>() {
@Override
public double applyAsDouble(double[] values) {
stdout.println("params=" + Arrays.toString(values));
System.out.println("c3:" + result_success_timer.getCount());
stdout.println(" RATE>>> " + flywheel.getCycleLimiter().toString());
int threads = (int) bobby.getParams().getValue("threads", values);
stdout.println("SETTING threads to " + threads);
flywheel.getActivityDef().setThreads(threads);
stdout.println("PARAM threads set to " + threads + " confirm: " + flywheel.getActivityDef().getThreads());
double rate = bobby.getParams().getValue("rate", values);
stdout.println("SETTING rate to "+ rate);
CycleRateSpec ratespec = new CycleRateSpec(rate, 1.1d, SimRateSpec.Verb.restart);
flywheel.onEvent(new ParamChange<>(ratespec));
stdout.println("PARAM cyclerate set to " +rate);
stdout.println(" RATE>>> " + flywheel.getCycleLimiter().toString());
System.out.println("c3b:" + result_success_timer.getCount());
stdout.println(" RATE>>> " + flywheel.getCycleLimiter().toString());
stdout.println("waiting 2 seconds for stabilization");
controller.waitMillis(2000);
System.out.println("c4:" + result_success_timer.getCount());
stdout.println(" RATE>>> " + flywheel.getCycleLimiter().toString());
sampler.startWindow();
stdout.println("waiting " + seconds + " seconds...");
controller.waitMillis(seconds * 1000L);
sampler.stopWindow();
double value = sampler.getCurrentWindowValue();
stdout.println("RESULT: " + sampler);
System.out.println("c5:" + result_success_timer.getCount());
stdout.println(" RATE>>> " + flywheel.getCycleLimiter().toString());
double value = sampler.getValue();
stdout.println("RESULT:\n" + sampler);
stdout.println("-".repeat(40));
return value;
}
@ -131,7 +150,7 @@ public class SC_optimo extends SCBaseScenario {
stdout.println("optimized result was " + result);
stdout.println("map of result was " + result.getMap());
// TODO: controller start should not return the activity itself, but a control point, like activityDef
// TODO: controller startAt should not return the activity itself, but a control point, like activityDef
// TODO: warn user if one of the result params is near or at the range allowed, as there
// could be a better result if the range is arbitrarily limiting the parameter space.

View File

@ -34,7 +34,7 @@ class PerfWindowSamplerTest {
pws.startWindow();
pws.stopWindow();
double value = pws.getCurrentWindowValue();
double value = pws.getValue();
assertThat(value).isCloseTo(9.0, Offset.offset(0.002));
}
@ -56,7 +56,7 @@ class PerfWindowSamplerTest {
a2.set(10L);
pws.stopWindow(1000L);
double value = pws.getCurrentWindowValue();
double value = pws.getValue();
assertThat(value).isCloseTo(30.0,Offset.offset(0.001));
pws.startWindow(10000L);
@ -64,7 +64,7 @@ class PerfWindowSamplerTest {
a2.set(42); // 42-1=41; 41+32=73
pws.stopWindow(11000L);
double value2 = pws.getCurrentWindowValue();
double value2 = pws.getValue();
assertThat(value2).isCloseTo(1248.0,Offset.offset(0.001));
}