mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
cleanup and document virtdata bundled app
This commit is contained in:
@@ -47,7 +47,7 @@ public class MutableFrontMatter extends LinkedHashMap<String,List<String>> {
|
||||
put(TITLE,List.of(title));
|
||||
}
|
||||
|
||||
public void setWeight(int weight) {
|
||||
public void setWeight(long weight) {
|
||||
put(WEIGHT,List.of(String.valueOf(weight)));
|
||||
}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -22,7 +22,6 @@ public class RunData {
|
||||
public long min;
|
||||
public long max;
|
||||
public int buffersize;
|
||||
public boolean isolated;
|
||||
public double totalGenTimeMs;
|
||||
public double totalCmpTimeMs;
|
||||
|
||||
@@ -35,7 +34,6 @@ public class RunData {
|
||||
this.min = min;
|
||||
this.max = max;
|
||||
this.buffersize = buffersize;
|
||||
this.isolated = isolated;
|
||||
this.totalGenTimeMs = totalGenTimeMs;
|
||||
this.totalCmpTimeMs = totalCmpTimeMs;
|
||||
}
|
||||
@@ -49,12 +47,11 @@ public class RunData {
|
||||
" max = " + max + "\n" +
|
||||
" [count] = " + (max - min) + "\n" +
|
||||
" buffersize = " + buffersize + "\n" +
|
||||
" isolated = " + isolated + "\n" +
|
||||
" [totalGenTimeMs] = " + totalGenTimeMs + "\n" +
|
||||
" [totalCmpTimeMs] = " + totalCmpTimeMs + "\n" +
|
||||
String.format(" [genPerMs] = %.3f\n", ((double) threads * (double) (max - min)) / totalGenTimeMs) +
|
||||
String.format(" [cmpPerMs] = %.3f\n", ((double) threads * (double) (max - min)) / totalCmpTimeMs) +
|
||||
String.format(" [genPerS] = %.3f\n", 1000.0d * ((double) threads * (double) (max-min)) / totalGenTimeMs) +
|
||||
String.format(" [cmpPerS] = %.3f\n", 1000.0d * ((double) threads * (double) (max-min)) / totalCmpTimeMs);
|
||||
String.format(" [genPerMs] = %.3f\n", ((double) threads * (double) (max - min)) / totalGenTimeMs) +
|
||||
String.format(" [cmpPerMs] = %.3f\n", ((double) threads * (double) (max - min)) / totalCmpTimeMs) +
|
||||
String.format(" [genPerS] = %.3f\n", 1000.0d * ((double) threads * (double) (max-min)) / totalGenTimeMs) +
|
||||
String.format(" [cmpPerS] = %.3f\n", 1000.0d * ((double) threads * (double) (max-min)) / totalCmpTimeMs);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -28,14 +28,14 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
public class ValuesCheckerCoordinator implements Callable<RunData> {
|
||||
private static final Logger logger =
|
||||
LogManager.getLogger(ValuesCheckerCoordinator.class);
|
||||
LogManager.getLogger(ValuesCheckerCoordinator.class);
|
||||
|
||||
private final String specifier;
|
||||
private final int threads;
|
||||
private final int bufsize;
|
||||
private final long end;
|
||||
private final long start;
|
||||
private final boolean isolated;
|
||||
private final boolean printValues;
|
||||
private final ReentrantLock lock;
|
||||
private final Condition goTime;
|
||||
private final ConcurrentLinkedDeque<Throwable> errors = new ConcurrentLinkedDeque<>();
|
||||
@@ -47,18 +47,18 @@ public class ValuesCheckerCoordinator implements Callable<RunData> {
|
||||
private long cmpTimeAccumulator = 0L;
|
||||
|
||||
public ValuesCheckerCoordinator(
|
||||
String specifier,
|
||||
int threads,
|
||||
int bufsize,
|
||||
long start,
|
||||
long end,
|
||||
boolean isolated) {
|
||||
String specifier,
|
||||
int threads,
|
||||
int bufsize,
|
||||
long start,
|
||||
long end,
|
||||
boolean printValues) {
|
||||
this.specifier = specifier;
|
||||
this.threads = threads;
|
||||
this.bufsize = bufsize;
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
this.isolated = isolated;
|
||||
this.printValues = printValues;
|
||||
this.lock = new ReentrantLock();
|
||||
this.goTime = lock.newCondition();
|
||||
}
|
||||
@@ -76,53 +76,41 @@ public class ValuesCheckerCoordinator implements Callable<RunData> {
|
||||
|
||||
|
||||
private void testConcurrentValues(
|
||||
int threads,
|
||||
long start,
|
||||
long end,
|
||||
String mapperSpec) {
|
||||
int threads,
|
||||
long start,
|
||||
long end,
|
||||
String mapperSpec) {
|
||||
|
||||
// Generate reference values in single-threaded mode.
|
||||
DataMapper<Object> mapper =
|
||||
VirtData.getOptionalMapper(specifier).orElseThrow(
|
||||
() -> new RuntimeException("Unable to map function for specifier: " + specifier)
|
||||
);
|
||||
VirtData.getOptionalMapper(specifier).orElseThrow(
|
||||
() -> new RuntimeException("Unable to map function for specifier: " + specifier)
|
||||
);
|
||||
|
||||
final List<Object> reference = new CopyOnWriteArrayList<>();
|
||||
|
||||
// Setup concurrent generator pool
|
||||
ValuesCheckerExceptionHandler valuesCheckerExceptionHandler =
|
||||
new ValuesCheckerExceptionHandler(this);
|
||||
new ValuesCheckerExceptionHandler(this);
|
||||
IndexedThreadFactory tf =
|
||||
new IndexedThreadFactory("values-checker", valuesCheckerExceptionHandler);
|
||||
new IndexedThreadFactory("values-checker", valuesCheckerExceptionHandler);
|
||||
pool =
|
||||
Executors.newFixedThreadPool(threads, tf);
|
||||
Executors.newFixedThreadPool(threads, tf);
|
||||
|
||||
logger.info("Checking [{}..{}) in chunks of {}", start, end, bufsize);
|
||||
|
||||
if (!isolated) {
|
||||
logger.debug(() ->
|
||||
"Sharing data mapper, only expect success for " +
|
||||
"explicitly thread-safe generators.");
|
||||
}
|
||||
|
||||
for (int t = 0; t < threads; t++) {
|
||||
ValuesCheckerRunnable runnable;
|
||||
|
||||
if (isolated) {
|
||||
runnable = new ValuesCheckerRunnable(
|
||||
start, end, bufsize, t, mapperSpec, null,
|
||||
readyQueue, goTime, lock, reference
|
||||
DataMapper<?> threadMapper = VirtData.getOptionalMapper(mapperSpec)
|
||||
.orElseThrow(
|
||||
() -> new RuntimeException("Unable to map function for specifier: " + specifier)
|
||||
);
|
||||
} else {
|
||||
DataMapper<?> threadMapper = VirtData.getOptionalMapper(mapperSpec)
|
||||
.orElseThrow(
|
||||
() -> new RuntimeException("Unable to map function for specifier: " + specifier)
|
||||
);
|
||||
runnable = new ValuesCheckerRunnable(
|
||||
start, end, bufsize, t,null, threadMapper,
|
||||
readyQueue, goTime, lock, reference
|
||||
);
|
||||
}
|
||||
runnable = new ValuesCheckerRunnable(
|
||||
start, end, bufsize, t, null, threadMapper,
|
||||
readyQueue, goTime, lock, reference, printValues
|
||||
);
|
||||
pool.execute(runnable);
|
||||
}
|
||||
|
||||
@@ -182,7 +170,7 @@ public class ValuesCheckerCoordinator implements Callable<RunData> {
|
||||
|
||||
synchronized void handleException(Thread t, Throwable e) {
|
||||
this.errors.add(e);
|
||||
if (pool!=null) {
|
||||
if (pool != null) {
|
||||
pool.shutdownNow();
|
||||
}
|
||||
|
||||
@@ -215,14 +203,14 @@ public class ValuesCheckerCoordinator implements Callable<RunData> {
|
||||
public RunData call() throws Exception {
|
||||
run();
|
||||
return new RunData(
|
||||
this.specifier,
|
||||
this.threads,
|
||||
this.start,
|
||||
this.end,
|
||||
this.bufsize,
|
||||
this.isolated,
|
||||
((double) genTimeAccumulator / 1000000.0D),
|
||||
((double) cmpTimeAccumulator / 1000000.0D)
|
||||
this.specifier,
|
||||
this.threads,
|
||||
this.start,
|
||||
this.end,
|
||||
this.bufsize,
|
||||
this.printValues,
|
||||
((double) genTimeAccumulator / 1000000.0D),
|
||||
((double) cmpTimeAccumulator / 1000000.0D)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -40,6 +40,7 @@ public class ValuesCheckerRunnable implements Runnable {
|
||||
private final int threadNum;
|
||||
private final ConcurrentLinkedQueue<Integer> readyQueue;
|
||||
private final int bufsize;
|
||||
private final boolean printValues;
|
||||
|
||||
public ValuesCheckerRunnable(
|
||||
long start,
|
||||
@@ -51,7 +52,8 @@ public class ValuesCheckerRunnable implements Runnable {
|
||||
ConcurrentLinkedQueue<Integer> readyQueue,
|
||||
Condition goTime,
|
||||
Lock lock,
|
||||
List<Object> expected
|
||||
List<Object> expected,
|
||||
boolean printValues
|
||||
) {
|
||||
this.start = start;
|
||||
this.end = end;
|
||||
@@ -61,6 +63,7 @@ public class ValuesCheckerRunnable implements Runnable {
|
||||
this.expected = expected;
|
||||
this.goTime = goTime;
|
||||
this.lock = lock;
|
||||
this.printValues = printValues;
|
||||
|
||||
this.mapper = (dataMapper != null) ? dataMapper : VirtData.getOptionalMapper(mapperSpec)
|
||||
.orElseThrow(
|
||||
@@ -78,19 +81,19 @@ public class ValuesCheckerRunnable implements Runnable {
|
||||
String rangeInfo = "t:" + threadNum + " [" + rangeStart + ".." + (rangeStart+bufsize) + ")";
|
||||
|
||||
synchronizeFor("generation start " + rangeInfo);
|
||||
// logger.debug(() -> "generating for " + "range: " + rangeStart + ".." + (rangeStart + bufsize));
|
||||
logger.debug("generating for " + "range: " + rangeStart + ".." + (rangeStart + bufsize));
|
||||
for (int i = 0; i < output.length; i++) {
|
||||
output[i] = mapper.get(i + rangeStart);
|
||||
// if (i==0) {
|
||||
// logger.debug(() -> "gen i:" + i + ", cycle: " + (i + rangeStart) + ": " + output[i]);
|
||||
// }
|
||||
if (i==0) {
|
||||
logger.debug("gen i:" + i + ", cycle: " + (i + rangeStart) + ": " + output[i]);
|
||||
}
|
||||
|
||||
}
|
||||
if (this.threadNum==0) {
|
||||
logger.trace(() -> "Thread " + threadNum + " putting values into comparable array before acking");
|
||||
expected.clear();
|
||||
expected.addAll(Arrays.asList(output));
|
||||
if (System.getProperties().containsKey("PRINTVALUES")) {
|
||||
if (printValues) {
|
||||
for (int i=0; i<output.length; i++) {
|
||||
System.out.println(start+i + "->" + output[i]);
|
||||
}
|
||||
@@ -99,7 +102,7 @@ public class ValuesCheckerRunnable implements Runnable {
|
||||
synchronizeFor("generation complete " + rangeInfo);
|
||||
|
||||
synchronizeFor("verification " + rangeInfo);
|
||||
// logger.debug(() -> "checker " + this.threadNum + " verifying range [" + start + ".." + (start + end) + ")");
|
||||
logger.debug(() -> "checker " + this.threadNum + " verifying range [" + start + ".." + (start + end) + ")");
|
||||
for (int bufidx = 0; bufidx < expected.size(); bufidx++) {
|
||||
if (!expected.get(bufidx).equals(output[bufidx])) {
|
||||
String errmsg = "Value differs: " +
|
||||
@@ -108,16 +111,12 @@ public class ValuesCheckerRunnable implements Runnable {
|
||||
|
||||
throw new RuntimeException(errmsg);
|
||||
}
|
||||
// else
|
||||
// {
|
||||
// System.out.println("Equal " + expected[bufidx] + " == " + output[bufidx]);
|
||||
// }
|
||||
}
|
||||
synchronizeFor("verification complete" + rangeInfo);
|
||||
|
||||
// logger.info(() -> "verified values for thread " + Thread.currentThread().getLibname() + " in range " +
|
||||
// rangeStart + ".." + (rangeStart + bufsize)
|
||||
// );
|
||||
logger.debug("verified values for thread " + Thread.currentThread() + " in range " +
|
||||
rangeStart + ".." + (rangeStart + bufsize)
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2022 nosqlbench
|
||||
* Copyright (c) 2022-2023 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -21,38 +21,56 @@ import io.nosqlbench.virtdata.core.bindings.VirtData;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
public class VirtDataCheckPerfApp {
|
||||
|
||||
private final static Logger logger = LogManager.getLogger(VirtDataCheckPerfApp.class);
|
||||
|
||||
public static void main(String[] args) {
|
||||
if (args.length==1) {
|
||||
checkperf(new String[]{args[0],"1","1","1","1"});
|
||||
} else if (args.length==5) {
|
||||
checkperf(args);
|
||||
} else {
|
||||
System.out.println(" ARGS: checkperf 'specifier' threads bufsize start end");
|
||||
System.out.println(" example: 'timeuuid()' 100 1000 0 10000");
|
||||
System.out.println(" specifier: A VirtData function specifier.");
|
||||
System.out.println(" threads: The number of concurrent threads to run.");
|
||||
System.out.println(" bufsize: The number of cycles to give each thread at a time.");
|
||||
System.out.println(" start: The start cycle for the test, inclusive.");
|
||||
System.out.println(" end: The end cycle for the test, exclusive.");
|
||||
System.out.println(" OR");
|
||||
System.out.println(" ARGS: diagnose 'specifier'");
|
||||
String spec="Identity()";
|
||||
int threads=1;
|
||||
int bufsize=1;
|
||||
long startCycle=0;
|
||||
long endCycle=1;
|
||||
boolean printValues=false;
|
||||
|
||||
if ((args.length>0) && args[args.length-1].equals("-p")) {
|
||||
printValues=true;
|
||||
args= Arrays.copyOfRange(args,0,args.length-1);
|
||||
}
|
||||
|
||||
switch (args.length) {
|
||||
case 5:
|
||||
endCycle=Integer.parseInt(args[4]);
|
||||
case 4:
|
||||
startCycle=Integer.parseInt(args[3]);
|
||||
case 3:
|
||||
bufsize=Integer.parseInt(args[2]);
|
||||
case 2:
|
||||
threads = Integer.parseInt(args[1]);
|
||||
case 1:
|
||||
spec = args[0];
|
||||
break;
|
||||
case 0:
|
||||
System.out.println(" ARGS: virtdata testmapper 'specifier' threads bufsize start end");
|
||||
System.out.println(" example: 'timeuuid()' 100 1000 0 10000");
|
||||
System.out.println(" specifier: A VirtData function specifier.");
|
||||
System.out.println(" threads: The number of concurrent threads to run.");
|
||||
System.out.println(" bufsize: The number of cycles to give each thread at a time.");
|
||||
System.out.println(" start: The start cycle for the test, inclusive.");
|
||||
System.out.println(" end: The end cycle for the test, exclusive.");
|
||||
System.out.println(" [-p]: print the values as a sanity check. (must appear last)");
|
||||
break;
|
||||
default:
|
||||
throw new RuntimeException("Error parsing args for " + String.join(" ",args));
|
||||
}
|
||||
|
||||
checkperf(spec,threads,bufsize,startCycle,endCycle,printValues);
|
||||
}
|
||||
|
||||
private static void checkperf(String[] args) {
|
||||
String spec = args[0];
|
||||
int threads = Integer.parseInt(args[1]);
|
||||
int bufsize = Integer.parseInt(args[2]);
|
||||
long start = Long.parseLong(args[3]);
|
||||
long end = Long.parseLong(args[4]);
|
||||
|
||||
boolean isolated = false;
|
||||
|
||||
ValuesCheckerCoordinator checker = new ValuesCheckerCoordinator(spec, threads, bufsize, start, end, isolated);
|
||||
private static void checkperf(String spec, int threads, int bufsize, long start, long end, boolean printValues) {
|
||||
ValuesCheckerCoordinator checker = new ValuesCheckerCoordinator(spec, threads, bufsize, start, end, printValues);
|
||||
|
||||
RunData runData;
|
||||
try {
|
||||
|
||||
Reference in New Issue
Block a user