add values summarizer to help debug bindings

This commit is contained in:
Jonathan Shook 2022-08-16 00:43:03 -05:00
parent a0344d65c9
commit 80fdb25301
4 changed files with 423 additions and 0 deletions

View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.virtdata.userlibs.apps.summarizer;
import java.util.DoubleSummaryStatistics;
import java.util.Map;
import java.util.function.ToDoubleFunction;
public class DataSetSummary<T> {
private final DoubleSummaryStatistics stats = new DoubleSummaryStatistics();
private String source;
private final ToDoubleFunction<T> toDoubleF;
public DataSetSummary(ToDoubleFunction<T> toDoubleF) {
this.toDoubleF = toDoubleF;
}
public void setSource(String source) {
this.source = source;
}
public String getSource() {
return this.source;
}
public void addObject(Object o) {
double value = toDoubleF.applyAsDouble((T) o);
stats.accept(value);
}
private void add(double value) {
stats.accept(value);
}
public String toString() {
return source + ": " + Map.of(
"count", stats.getCount(),
"min", stats.getMin(),
"max", stats.getMax(),
"average", stats.getAverage(),
"sum", stats.getSum()
);
}
public static DoubleSummaryStatistics reduce(DataSetSummary<?> left, DataSetSummary<?> right) {
var thisdata=left.getSummaryStats();
var thatdata=right.getSummaryStats();
DoubleSummaryStatistics newstats = new DoubleSummaryStatistics();
newstats.combine(thisdata);
newstats.combine(thatdata);
return newstats;
}
public DoubleSummaryStatistics getSummaryStats() {
return this.stats;
}
}

View File

@ -0,0 +1,143 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.virtdata.userlibs.apps.summarizer;
import io.nosqlbench.virtdata.userlibs.apps.valuechecker.IndexedThreadFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.IntFunction;
public class StageManager implements Thread.UncaughtExceptionHandler, Runnable {
private final static Logger logger = LogManager.getLogger(StageManager.class);
private final ConcurrentLinkedDeque<Throwable> errors = new ConcurrentLinkedDeque<>();
private final IndexedThreadFactory tf;
private final ExecutorService pool;
private final IntFunction<Runnable> tasks;
private final int threads;
Lock lock = new ReentrantLock();
Condition goTime = lock.newCondition();
private final ConcurrentLinkedQueue<Object> readyQueue = new ConcurrentLinkedQueue<>();
public StageManager(int threads, IntFunction<Runnable> tasks) {
this.threads = threads;
this.tf = new IndexedThreadFactory("values-checker", this);
this.pool = Executors.newFixedThreadPool(threads, tf);
this.tasks = tasks;
}
@Override
public void run() {
for (int i = 0; i < threads; i++) {
Runnable runnable = tasks.apply(i);
RunBox box = new RunBox(runnable,this);
pool.submit(box);
}
coordinateFor(threads,"tasks");
coordinateFor(threads,"completion");
}
private final static class RunBox implements Runnable {
private final Runnable inner;
private final StageManager stage;
public RunBox(Runnable inner, StageManager stage) {
this.inner = inner;
this.stage = stage;
}
@Override
public void run() {
logger.debug("blocking for start");
stage.OnYourMarkGetSet(this);
logger.debug("running");
inner.run();
logger.debug("blocking for completion");
stage.OnYourMarkGetSet(this);
logger.debug("returning");
}
}
public void OnYourMarkGetSet(Object forWhat) {
try {
lock.lock();
readyQueue.add(forWhat);
logger.trace("awaiting signal for " + forWhat);
goTime.await();
} catch (Throwable e) {
System.out.println("error while synchronizing: " + e.getMessage());
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
public void coordinateFor(int concurrency, String forWhat) {
logger.trace("coordinating " + concurrency + " threads for " + forWhat);
try {
long delay = 1;
long startedAt = System.currentTimeMillis();
while (readyQueue.size() < concurrency) {
long waitedFor = System.currentTimeMillis() -startedAt;
if (waitedFor>10000L) {
throw new RuntimeException("Waited for " + waitedFor + " millis and not synchronized yet for " + forWhat);
}
logger.debug("threads ready for " + forWhat + ": " + readyQueue.size() + ", delaying " + delay + "ms");
Thread.sleep(delay);
delay = Math.min(1024, delay * 2);
throwInjectedExceptions();
}
readyQueue.clear();
lock.lock();
goTime.signalAll();
} catch (Exception e) {
logger.error("Error while signaling threads:", e);
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
private synchronized void throwInjectedExceptions() {
if (errors.peekFirst() != null) {
int count = 0;
for (Throwable error : errors) {
System.out.print("EXCEPTION " + count++ + ": ");
System.out.println(error.getMessage());
}
throw new RuntimeException(errors.peekFirst());
}
}
@Override
public void uncaughtException(Thread t, Throwable e) {
}
}

View File

@ -0,0 +1,164 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.virtdata.userlibs.apps.summarizer;
import io.nosqlbench.api.spi.BundledApp;
import io.nosqlbench.nb.annotations.Service;
import io.nosqlbench.virtdata.core.bindings.DataMapper;
import io.nosqlbench.virtdata.core.bindings.VirtData;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import picocli.CommandLine;
import picocli.CommandLine.Command;
import picocli.CommandLine.Option;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.function.IntFunction;
import java.util.function.Supplier;
@Command(
name = "summarize-values",
description = "Summarize the range of values provided by a binding function",
helpCommand = true,
showDefaultValues = true
)
@Service(value = BundledApp.class, selector = "summarize-values")
public class ValueSummarizerApp implements BundledApp, Callable<Integer> {
private final static Logger logger = LogManager.getLogger(ValueSummarizerApp.class);
@Option(
names = {"loglevel"},
description = {"The level at which to log diagnostic lines."}
)
Level level = Level.DEBUG;
@Option(
names = {"cycles"},
description = "The cycle range, in <count> or <start>..<end> format. This is a closed-open interval as in [x,y)"
)
String cycles = "1";
@Option(
names = {"binding"},
description = "The binding recipe to test, as it would be found in a workload YAML"
)
String binding = "ToString()";
@Option(
names = {"type"},
description = "The object type to assert on binding output. This is 'Object' by default."
)
Class<?> type = Object.class;
@Option(
names = {"threads"},
description = "The number of threads to spread the cycles over"
)
int threads = 1;
private DataMapper<Object> mapper;
private Supplier<DataSetSummary<?>> summarySupplier;
private final List<DataSetSummary<?>> summaries = new ArrayList<>();
public static void main(String[] args) {
int result = new ValueSummarizerApp().applyAsInt(args);
System.exit(result);
}
@Override
public int applyAsInt(String[] args) {
return new CommandLine(new ValueSummarizerApp()).execute(args);
}
@Override
public Integer call() throws Exception {
this.mapper = VirtData.getOptionalMapper(binding, type, Map.of()).orElseThrow(
() -> new RuntimeException("Unable to find a binding for '" + binding + " of type '" + type.getSimpleName() + "'")
);
Object value = mapper.get(1L);
if (value instanceof Character) {
summarySupplier = () -> new DataSetSummary<Character>(c -> c);
} else if (value instanceof Integer) {
summarySupplier = () -> new DataSetSummary<Integer>(i -> i);
} else if (value instanceof Short) {
summarySupplier = () -> new DataSetSummary<Short>(s -> s);
} else if (value instanceof Float) {
summarySupplier = () -> new DataSetSummary<Float>(f -> f);
} else if (value instanceof Long) {
summarySupplier = () -> new DataSetSummary<Long>(l -> l);
} else if (value instanceof CharBuffer) {
summarySupplier = () -> new DataSetSummary<>(CharBuffer::remaining);
} else if (value instanceof CharSequence) {
summarySupplier = () -> new DataSetSummary<>(CharSequence::length);
} else if (value instanceof ByteBuffer) {
summarySupplier = () -> new DataSetSummary<>(ByteBuffer::remaining);
} else if (value instanceof Number) {
summarySupplier = () -> new DataSetSummary<>(Number::doubleValue);
} else {
logger.warn("Using default 'toString().length()' summarizer for type " + type.getSimpleName());
summarySupplier = () -> new DataSetSummary<>(o -> (long) o.toString().length());
}
IntFunction<Runnable> tasks = this::taskForThreadIdx;
StageManager stage = new StageManager(threads, tasks);
stage.run();
for (DataSetSummary<?> summary : summaries) {
DoubleSummaryStatistics stats = summary.getSummaryStats();
System.out.println(summary);
logger.log(level, summary);
}
DoubleSummaryStatistics summary = summaries.stream().map(DataSetSummary::getSummaryStats).reduce((l, r) -> {
l.combine(r);
return l;
}).get();
System.out.println("combined:" + summary);
return 0;
}
private Runnable taskForThreadIdx(int idx) {
DataSetSummary<?> summary = summarySupplier.get();
summaries.add(summary);
long start=computeOffset(startCycle(),endCycle(),threads,idx);
long end=computeOffset(startCycle(),endCycle(),threads,idx+1);
return new ValuesTask(start, end, mapper, summary);
}
private long computeOffset(long startIncl, long endExcl, int participants, int slot) {
long total = endExcl - startIncl;
long div = total / participants;
long mod = total % participants;
long offset = div * slot + ((slot <= mod) ? slot : mod);
return offset;
}
private long endCycle() {
return Long.parseLong(cycles.contains("..") ? cycles.substring(cycles.indexOf("..") + 2) : cycles);
}
private long startCycle() {
return Long.parseLong(cycles.contains("..") ? cycles.substring(0, cycles.indexOf("..")) : "0");
}
}

View File

@ -0,0 +1,47 @@
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.virtdata.userlibs.apps.summarizer;
import io.nosqlbench.virtdata.core.bindings.DataMapper;
public class ValuesTask implements Runnable {
private final long startIncl;
private final long endExcl;
private final DataMapper<Object> mapper;
private final DataSetSummary<?> summary;
public ValuesTask(long startIncl, long endExcl, DataMapper<Object> mapper, DataSetSummary<?> summary) {
this.startIncl = startIncl;
this.endExcl = endExcl;
this.mapper = mapper;
this.summary = summary;
}
@Override
public void run() {
summary.setSource(Thread.currentThread().getName()+"[" + getRange()+"): ");
for (long cycle = startIncl; cycle < endExcl; cycle++) {
Object apply = mapper.apply(cycle);
summary.addObject(apply);
}
}
public String getRange() {
return startIncl + ".." + (endExcl);
}
}