mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
improve rate limiter tests
This commit is contained in:
parent
bc0fb04fbd
commit
ce73fca6ba
@ -62,7 +62,7 @@ public class RateLimiterPerfTestMethods {
|
||||
return perf.getLastResult();
|
||||
}
|
||||
|
||||
public Result rateLimiterSingleThreadedConvergence(Function<RateSpec,RateLimiter> rlf, RateSpec rs, long startingCycles, double margin) {
|
||||
public Result rateLimiterSingleThreadedConvergence(Function<RateSpec, RateLimiter> rlf, RateSpec rs, long startingCycles, double margin) {
|
||||
//rl.applyRateSpec(rl.getRateSpec().withOpsPerSecond(1E9));
|
||||
Bounds bounds = new Bounds(startingCycles, 2);
|
||||
Perf perf = new Perf("nanotime");
|
||||
@ -139,21 +139,21 @@ public class RateLimiterPerfTestMethods {
|
||||
double duration = (endAt - startAt) / 1000000000.0d;
|
||||
double acqops = (count / duration);
|
||||
|
||||
System.out.println(rl.toString());
|
||||
System.out.println(rl);
|
||||
|
||||
System.out.println(ANSI_Blue +
|
||||
String.format(
|
||||
"spec: %s\n count: %9d, duration %.5fS, acquires/s %.3f, nanos/op: %f\n delay: %d (%.5fS)",
|
||||
rl.getRateSpec(),
|
||||
count, duration, acqops, (1_000_000_000.0d / acqops), divDelay, (divDelay / 1_000_000_000.0d)) +
|
||||
ANSI_Reset);
|
||||
String.format(
|
||||
"spec: %s\n count: %9d, duration %.5fS, acquires/s %.3f, nanos/op: %f\n delay: %d (%.5fS)",
|
||||
rl.getRateSpec(),
|
||||
count, duration, acqops, (1_000_000_000.0d / acqops), divDelay, (divDelay / 1_000_000_000.0d)) +
|
||||
ANSI_Reset);
|
||||
|
||||
}
|
||||
|
||||
long[] delays = results.stream().mapToLong(Long::longValue).toArray();
|
||||
|
||||
String delaySummary = Arrays.stream(delays).mapToDouble(d -> (double) d / 1_000_000_000.0D).mapToObj(d -> String.format("%.3f", d))
|
||||
.collect(Collectors.joining(","));
|
||||
.collect(Collectors.joining(","));
|
||||
System.out.println("delays in seconds:\n" + delaySummary);
|
||||
System.out.println("delays in ns:\n" + Arrays.toString(delays));
|
||||
|
||||
@ -176,7 +176,7 @@ public class RateLimiterPerfTestMethods {
|
||||
* This a low-overhead test for multi-threaded access to the same getOpsPerSec limiter. It calculates the
|
||||
* effective concurrent getOpsPerSec under atomic contention.
|
||||
*/
|
||||
public Perf testRateLimiterMultiThreadedContention(Function<RateSpec,RateLimiter> rlFunc, RateSpec spec, long iterations, int threadCount) {
|
||||
public Perf testRateLimiterMultiThreadedContention(Function<RateSpec, RateLimiter> rlFunc, RateSpec spec, long iterations, int threadCount) {
|
||||
System.out.println("Running " + Thread.currentThread().getStackTrace()[1].getMethodName());
|
||||
|
||||
RateLimiter rl = rlFunc.apply(spec);
|
||||
@ -187,24 +187,24 @@ public class RateLimiterPerfTestMethods {
|
||||
}
|
||||
RateLimiterPerfTestMethods.TestExceptionHandler errorhandler = new RateLimiterPerfTestMethods.TestExceptionHandler();
|
||||
RateLimiterPerfTestMethods.TestThreadFactory threadFactory = new RateLimiterPerfTestMethods.TestThreadFactory(errorhandler);
|
||||
ExecutorService tp = Executors.newFixedThreadPool(threadCount+1, threadFactory);
|
||||
ExecutorService tp = Executors.newFixedThreadPool(threadCount + 1, threadFactory);
|
||||
|
||||
System.out.format("Running %d iterations split over %d threads (%d) at getOpsPerSec %.3f\n", iterations, threadCount, (iterations / threadCount), rate);
|
||||
System.out.format("Running %,d iterations split over %,d threads (%,d per) at %,.3f ops/s\n", iterations, threadCount, (iterations / threadCount), rate);
|
||||
RateLimiterPerfTestMethods.Acquirer[] threads = new RateLimiterPerfTestMethods.Acquirer[threadCount];
|
||||
DeltaHdrHistogramReservoir stats = new DeltaHdrHistogramReservoir("times", 5);
|
||||
|
||||
CyclicBarrier barrier = new CyclicBarrier(threadCount+1);
|
||||
CyclicBarrier barrier = new CyclicBarrier(threadCount + 1);
|
||||
|
||||
RateLimiterStarter starter = new RateLimiterStarter(barrier, rl);
|
||||
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
threads[i] = new RateLimiterPerfTestMethods.Acquirer(i, rl, (int) (iterationsPerThread), stats, barrier);
|
||||
threads[i] = new RateLimiterPerfTestMethods.Acquirer(i, rl, iterationsPerThread, stats, barrier);
|
||||
// threads[i] = new RateLimiterPerfTestMethods.Acquirer(i, rl, (int) (iterations / threadCount), stats, barrier);
|
||||
}
|
||||
|
||||
tp.execute(starter);
|
||||
|
||||
System.out.println("limiter stats:" + rl);
|
||||
System.out.println(rl);
|
||||
System.out.format("submitting (%d threads)...\n", threads.length);
|
||||
List<Future<Result>> futures = new ArrayList<>();
|
||||
for (int i = 0; i < threadCount; i++) {
|
||||
@ -223,7 +223,7 @@ public class RateLimiterPerfTestMethods {
|
||||
|
||||
errorhandler.throwIfAny();
|
||||
|
||||
System.out.println("limiter stats:" + rl);
|
||||
System.out.println(rl);
|
||||
|
||||
Perf aggregatePerf = new Perf("contended with " + threadCount + " threads for " + iterations + " iterations for " + rl.getRateSpec().toString());
|
||||
futures.stream().map(f -> {
|
||||
@ -234,7 +234,7 @@ public class RateLimiterPerfTestMethods {
|
||||
}
|
||||
}).forEachOrdered(aggregatePerf::add);
|
||||
|
||||
System.out.println(aggregatePerf);
|
||||
// System.out.println(aggregatePerf);
|
||||
|
||||
// if (rl instanceof HybridRateLimiter) {
|
||||
// String refillLog = ((HybridRateLimiter) rl).getRefillLog();
|
||||
@ -246,8 +246,8 @@ public class RateLimiterPerfTestMethods {
|
||||
}
|
||||
|
||||
private static class RateLimiterStarter implements Runnable {
|
||||
private CyclicBarrier barrier;
|
||||
private RateLimiter rl;
|
||||
private final CyclicBarrier barrier;
|
||||
private final RateLimiter rl;
|
||||
|
||||
public RateLimiterStarter(CyclicBarrier barrier, RateLimiter rl) {
|
||||
this.barrier = barrier;
|
||||
@ -257,9 +257,9 @@ public class RateLimiterPerfTestMethods {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
System.out.println("awaiting barrier (starter) (" + barrier.getNumberWaiting() + " awaiting)");
|
||||
// System.out.println("awaiting barrier (starter) (" + barrier.getNumberWaiting() + " awaiting)");
|
||||
barrier.await(60, TimeUnit.SECONDS);
|
||||
System.out.println("started the rate limiter (starter) (" + barrier.getNumberWaiting() + " awaiting)");
|
||||
// System.out.println("started the rate limiter (starter) (" + barrier.getNumberWaiting() + " awaiting)");
|
||||
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
@ -291,7 +291,7 @@ public class RateLimiterPerfTestMethods {
|
||||
private final int threadIdx;
|
||||
private final DeltaHdrHistogramReservoir reservoir;
|
||||
private final CyclicBarrier barrier;
|
||||
private long iterations;
|
||||
private final long iterations;
|
||||
|
||||
public Acquirer(int i, RateLimiter limiter, int iterations, DeltaHdrHistogramReservoir reservoir, CyclicBarrier barrier) {
|
||||
this.threadIdx = i;
|
||||
@ -304,14 +304,18 @@ public class RateLimiterPerfTestMethods {
|
||||
@Override
|
||||
public Result call() {
|
||||
// synchronized (barrier) {
|
||||
try {
|
||||
System.out.println("awaiting barrier " + this.threadIdx + " (" + barrier.getNumberWaiting() + " awaiting)");
|
||||
barrier.await(60, TimeUnit.SECONDS);
|
||||
|
||||
// System.out.println("starting " + this.threadIdx);
|
||||
} catch (Exception be) {
|
||||
throw new RuntimeException(be); // This should not happen unless the test is broken
|
||||
try {
|
||||
if (this.threadIdx == 0) {
|
||||
System.out.println("awaiting barrier");
|
||||
}
|
||||
barrier.await(60, TimeUnit.SECONDS);
|
||||
if (this.threadIdx == 0) {
|
||||
System.out.println("starting all threads");
|
||||
}
|
||||
|
||||
} catch (Exception be) {
|
||||
throw new RuntimeException(be); // This should not happen unless the test is broken
|
||||
}
|
||||
// }
|
||||
long startTime = System.nanoTime();
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
|
@ -33,8 +33,61 @@ import java.util.function.Function;
|
||||
*/
|
||||
public class TestRateLimiterPerf1E8 {
|
||||
|
||||
private Function<RateSpec, RateLimiter> rlFunction = rs -> new HybridRateLimiter(ActivityDef.parseActivityDef("alias=tokenrl"),"hybrid", rs.withVerb(RateSpec.Verb.configure));
|
||||
private RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
|
||||
private final Function<RateSpec, RateLimiter> rlFunction =
|
||||
rs -> new HybridRateLimiter(
|
||||
ActivityDef.parseActivityDef("alias=tokenrl"),
|
||||
"hybrid",
|
||||
rs.withVerb(RateSpec.Verb.configure)
|
||||
);
|
||||
private final RateLimiterPerfTestMethods methods = new RateLimiterPerfTestMethods();
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_4000threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(
|
||||
rlFunction,
|
||||
new RateSpec(1E8, 1.1),
|
||||
100_000_000,
|
||||
4000
|
||||
);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_2000threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(
|
||||
rlFunction,
|
||||
new RateSpec(1E8, 1.1),
|
||||
100_000_000,
|
||||
2000
|
||||
);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_1000threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(
|
||||
rlFunction,
|
||||
new RateSpec(1E8, 1.1),
|
||||
100_000_000,
|
||||
1000
|
||||
);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_320threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(
|
||||
rlFunction,
|
||||
new RateSpec(1E8, 1.1),
|
||||
100_000_000,
|
||||
320
|
||||
);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
// 160 threads at 100_000_000 ops/s
|
||||
// 1600000000_ops 149.351811_S 10712960.186_ops_s, 93_ns_op
|
||||
@ -46,7 +99,12 @@ public class TestRateLimiterPerf1E8 {
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_160threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000,160);
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(
|
||||
rlFunction,
|
||||
new RateSpec(1E8, 1.1),
|
||||
100_000_000,
|
||||
160
|
||||
);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
@ -57,7 +115,7 @@ public class TestRateLimiterPerf1E8 {
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_80threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000,80);
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 80);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
@ -70,7 +128,7 @@ public class TestRateLimiterPerf1E8 {
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_40threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000,40);
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 40);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
@ -90,7 +148,7 @@ public class TestRateLimiterPerf1E8 {
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_20threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000,20);
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 20);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
@ -106,7 +164,7 @@ public class TestRateLimiterPerf1E8 {
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_10threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000,10);
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 10);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
@ -123,7 +181,7 @@ public class TestRateLimiterPerf1E8 {
|
||||
@Test
|
||||
@Disabled
|
||||
public void test100Mops_5threads() {
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000,5);
|
||||
Perf perf = methods.testRateLimiterMultiThreadedContention(rlFunction, new RateSpec(1E8, 1.1), 100_000_000, 5);
|
||||
System.out.println(perf.getLastResult());
|
||||
}
|
||||
|
||||
|
@ -26,7 +26,7 @@ public class Result {
|
||||
private final long start;
|
||||
private final long end;
|
||||
private final long ops;
|
||||
private String description;
|
||||
private final String description;
|
||||
|
||||
public Result(String description, long start, long end, long ops) {
|
||||
this.description = description;
|
||||
@ -58,15 +58,15 @@ public class Result {
|
||||
@Override
|
||||
public String toString() {
|
||||
long time_ns = end - start;
|
||||
return String.format("'%s': %d_ops %f_S %.3f_ops_s, %.0f_ns_op", description, ops, getTimeSeconds(), getOpsPerSec(), getNsPerOp());
|
||||
return String.format("'%s': %d_ops %,f_S %,.3f_ops_s, %,.0f_ns_op", description, ops, getTimeSeconds(), getOpsPerSec(), getNsPerOp());
|
||||
}
|
||||
|
||||
public static List<String> toString(List<Result> results) {
|
||||
List<String> ldesc = results.stream().map(Result::getDescription).collect(Collectors.toList());
|
||||
List<String> lops = results.stream().map(r -> String.format("%d_ops",r.getTotalOps())).collect(Collectors.toList());
|
||||
List<String> ltime_s = results.stream().map(r -> String.format("%f_S",r.getTimeSeconds())).collect(Collectors.toList());
|
||||
List<String> lops_s = results.stream().map(r -> String.format("%.3f_ops_s",r.getOpsPerSec())).collect(Collectors.toList());
|
||||
List<String> lns_op = results.stream().map(r -> String.format("%.0f_ns_op",r.getNsPerOp())).collect(Collectors.toList());
|
||||
List<String> lops = results.stream().map(r -> String.format("%,d_ops",r.getTotalOps())).collect(Collectors.toList());
|
||||
List<String> ltime_s = results.stream().map(r -> String.format("%,f_S",r.getTimeSeconds())).collect(Collectors.toList());
|
||||
List<String> lops_s = results.stream().map(r -> String.format("%,.3f_ops_s",r.getOpsPerSec())).collect(Collectors.toList());
|
||||
List<String> lns_op = results.stream().map(r -> String.format("%,.0f_ns_op",r.getNsPerOp())).collect(Collectors.toList());
|
||||
|
||||
int sizeof_ldesc = ldesc.stream().mapToInt(String::length).max().orElse(0);
|
||||
int sizeof_lops = lops.stream().mapToInt(String::length).max().orElse(0);
|
||||
@ -83,7 +83,6 @@ public class Result {
|
||||
return rows;
|
||||
}
|
||||
|
||||
|
||||
public long getStartNanos() {
|
||||
return this.start;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user