provide easy data streams for testing in API

This commit is contained in:
Jonathan Shook 2020-07-27 20:02:58 -05:00
parent 392bbcc595
commit 7578e91d77
12 changed files with 381 additions and 0 deletions

View File

@ -0,0 +1,51 @@
package io.nosqlbench.virtdata.userlibs.streams;
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_bytebuffer.HashedToByteBuffer;
import io.nosqlbench.virtdata.userlibs.streams.fillers.ChunkedByteBuffer;
import io.nosqlbench.virtdata.userlibs.streams.fillers.LongFunctionIterable;
import io.nosqlbench.virtdata.userlibs.streams.pojos.ByteBufferObject;
import org.jetbrains.annotations.NotNull;
import java.nio.ByteBuffer;
import java.util.Iterator;
public class ByteBufferStreams {
public static Iterable<ByteBufferObject> byteBufferObjects(long startCycle, long endCycle, int bufsize) {
HashedToByteBuffer htbb = new HashedToByteBuffer(bufsize);
LongFunctionIterable<ByteBufferObject> bbi = new LongFunctionIterable<>(startCycle, endCycle, l -> new ByteBufferObject(htbb.apply(l)));
return bbi;
// LongFunctionIterable<ByteBuffer> byteBuffers = new LongFunctionIterable<>(0L, new HashedToByteBuffer(bufsize));
}
public static Iterable<ByteBuffer> byteBuffers(long startCycle, long endCycle, int bufsize) {
HashedToByteBuffer htbb = new HashedToByteBuffer(bufsize);
LongFunctionIterable<ByteBuffer> bbi = new LongFunctionIterable<>(startCycle, endCycle, htbb);
return bbi;
}
public static Iterable<ByteBuffer> partialByteBuffers(int startCycle, int endCycle, int bufSize) {
Iterable<ByteBuffer> byteBuffers = byteBuffers(startCycle, endCycle, bufSize);
return new ChunkedByteBuffer(byteBuffers);
}
private final static class ByteBufferObjectIterable implements Iterable<ByteBufferObject> {
@NotNull
@Override
public Iterator<ByteBufferObject> iterator() {
return new ByteBufferObjectIterator();
}
private final static class ByteBufferObjectIterator implements Iterator<ByteBufferObject> {
@Override
public boolean hasNext() {
return false;
}
@Override
public ByteBufferObject next() {
return null;
}
}
}
}

View File

@ -0,0 +1,9 @@
package io.nosqlbench.virtdata.userlibs.streams;
import java.nio.ByteBuffer;
public class VirtDataStreams {
public static Iterable<ByteBuffer> byteBuffers(long startCycle, long endCycle, int bufsize) {
return ByteBufferStreams.byteBuffers(startCycle, endCycle, bufsize);
}
}

View File

@ -0,0 +1,35 @@
package io.nosqlbench.virtdata.userlibs.streams.fillers;
import java.nio.ByteBuffer;
import java.util.Iterator;
/**
* If a type implements this interface, then it wants to be provided with a data source in the form of a series of byte
* buffers. The source must provide additional byte buffers as they are requested. These byte buffers aren't required to
* be any particular size. They aren't required to have distinct or special data. All that is required is that they
* never run out.
*/
public interface ByteBufferFillable extends Fillable {
void fill(Iterable<ByteBuffer> source);
static void fillByteBuffer(ByteBuffer target, Iterable<ByteBuffer> source) {
Iterator<ByteBuffer> iterator = source.iterator();
target.clear();
while (target.remaining() > 0 && iterator.hasNext()) {
ByteBuffer next = iterator.next();
if (next.remaining() > target.remaining()) {
byte[] bytes = new byte[target.remaining()];
next.get(bytes, 0, target.remaining());
target.put(bytes);
} else {
target.put(next);
}
if (target.remaining() == 0) {
break;
}
}
target.flip();
}
}

View File

@ -0,0 +1,12 @@
package io.nosqlbench.virtdata.userlibs.streams.fillers;
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_bytebuffer.HashedToByteBuffer;
import java.nio.ByteBuffer;
public class ByteBufferSource extends LongFunctionIterable<ByteBuffer> {
public ByteBufferSource(long startCycle, long endCycle, int bufsize) {
super(startCycle, endCycle, new HashedToByteBuffer(bufsize));
}
}

View File

@ -0,0 +1,86 @@
package io.nosqlbench.virtdata.userlibs.streams.fillers;
import org.jetbrains.annotations.NotNull;
import java.nio.ByteBuffer;
import java.util.Iterator;
/**
* <H2>Synopsis</H2>
*
* <p>This iterator breaks some rules! The intent of this iterator is to
* make it programmatically easy to consume raw data in ByteBuffer mode
* without throwing away generated data. The efficiency loss of throwing
* away data is variable, but two reason are used as premise for this
* approach: 1) generation is not free and 2) the buf size mismatch
* between producer and consumer could be very high, amplifying the generation
* cost of data.</p>
*
* <p>Yet, the iterable pattern is very easy to integrate with, and so long
* as the user understands what this iterable does, it should make things
* easier by far than *not* having this helper class.</p>
*
* <p>All this iterator does is wrap another iterator and cache the current
* ByteBuffer, re-issuing it until it is consumed fully.</p>
*
* <H2>Usage Patterns</H2>
*
* This is intended to be called as an interative data source, where another
* iteration controls flow.
*
* <H2>Warnings</H2>
*
* This class is not thread safe. Either wrap it in a ThreadLocal with appropriate
* initialization for concurrent use, or make sure no concurrent access occurs.
*/
public class ChunkedByteBuffer implements Iterable<ByteBuffer> {
private final Iterable<ByteBuffer> source;
public ChunkedByteBuffer(Iterable<ByteBuffer> source) {
this.source = source;
}
@NotNull
@Override
public Iterator<ByteBuffer> iterator() {
return new ChunkedByteBufferIterator(source.iterator());
}
private static class ChunkedByteBufferIterator implements Iterator<ByteBuffer> {
private final Iterator<ByteBuffer> sourceIter;
private ByteBuffer buf;
int chunks;
public ChunkedByteBufferIterator(Iterator<ByteBuffer> iterator) {
this.sourceIter = iterator;
}
@Override
public boolean hasNext() {
if (buf != null && buf.remaining() <= 0) {
buf = null;
}
if (buf == null) {
if (sourceIter.hasNext()) {
buf = sourceIter.next();
chunks++;
}
}
return buf != null;
}
@Override
public ByteBuffer next() {
return buf;
}
public String toString() {
return "chunk " + chunks + " / position " + (buf==null ? "NULL" : buf.position());
}
}
}

View File

@ -0,0 +1,27 @@
package io.nosqlbench.virtdata.userlibs.streams.fillers;
import java.nio.ByteBuffer;
import java.util.List;
/**
* A fillable object needs to be filled with data. It may also have owned properties or objects that also need to be
* filled with data. A type must be fillable if it is to be traversed to find other fillable elements.
*
* Fillable elements should expect to have whatever data they contain be replaced when they are filled.
*/
public interface Fillable {
default List<Fillable> getFillables() {
return List.of();
}
static void fill(Fillable fillable, ByteBufferSource source) {
if (fillable instanceof ByteBufferFillable) {
((ByteBufferFillable) fillable).fill(source);
} else {
throw new RuntimeException("Unknown fillable type " + fillable.getClass().getCanonicalName());
}
for (Fillable fillableFillable : fillable.getFillables()) {
fill(fillableFillable, source);
}
}
}

View File

@ -0,0 +1,27 @@
package io.nosqlbench.virtdata.userlibs.streams.fillers;
import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
public class IterableFiller<T> implements Iterable<Fillable> {
@NotNull
@Override
public Iterator<Fillable> iterator() {
return new FillerIterator<Fillable>();
}
private static class FillerIterator<T> implements Iterator<Fillable> {
@Override
public boolean hasNext() {
return false;
}
@Override
public Fillable next() {
return null;
}
}
}

View File

@ -0,0 +1,47 @@
package io.nosqlbench.virtdata.userlibs.streams.fillers;
import org.jetbrains.annotations.NotNull;
import java.util.Iterator;
import java.util.function.LongFunction;
public class LongFunctionIterable<T> implements Iterable<T> {
private final long startCycle;
private final LongFunction<T> f;
private final long endCycle;
public LongFunctionIterable(long startCycle,long endCycle, LongFunction<T> f) {
this.startCycle = startCycle;
this.endCycle = endCycle;
this.f = f;
}
@NotNull
@Override
public Iterator<T> iterator() {
return new LongFunctionIterator(startCycle, endCycle, f);
}
private class LongFunctionIterator implements Iterator<T> {
private final long endCycle;
private long cycle;
private final LongFunction<T> f;
public LongFunctionIterator(long startCycle, long endCycle, LongFunction<T> f) {
this.cycle = startCycle;
this.endCycle = endCycle;
this.f = f;
}
@Override
public boolean hasNext() {
return cycle<endCycle;
}
@Override
public T next() {
return f.apply(cycle++);
}
}
}

View File

@ -0,0 +1,5 @@
/**
* This package contains pre-designed streams of objects for the purpose
* of testing other libraries and tools.
*/
package io.nosqlbench.virtdata.userlibs.streams;

View File

@ -0,0 +1,28 @@
package io.nosqlbench.virtdata.userlibs.streams.pojos;
import io.nosqlbench.virtdata.userlibs.streams.fillers.ByteBufferFillable;
import io.nosqlbench.virtdata.userlibs.streams.fillers.Fillable;
import java.nio.ByteBuffer;
import java.util.List;
public class ByteBufferObject implements ByteBufferFillable {
private final ByteBuffer buffer;
public ByteBufferObject(int size) {
this.buffer = ByteBuffer.allocate(size);
}
public ByteBufferObject(ByteBuffer source) {
this.buffer = source;
}
public ByteBuffer getBuffer() {
return buffer;
}
@Override
public void fill(Iterable<ByteBuffer> source) {
ByteBufferFillable.fillByteBuffer(this.buffer,source);
}
}

View File

@ -0,0 +1,26 @@
package io.nosqlbench.virtdata.userlibs.streams.fillers;
import io.nosqlbench.virtdata.userlibs.streams.ByteBufferStreams;
import org.junit.Test;
import java.nio.ByteBuffer;
import static org.assertj.core.api.Assertions.assertThat;
public class ChunkedByteBufferTest {
@Test
public void testChunkedByteBufferx10() {
Iterable<ByteBuffer> byteBuffers = ByteBufferStreams.partialByteBuffers(0, 100, 100);
byte[] buf37 = new byte[37];
int count = 0;
for (ByteBuffer byteBuffer : byteBuffers) {
byteBuffer.get(buf37,0,Math.min(buf37.length,byteBuffer.remaining()));
count++;
}
assertThat(count).isEqualTo(300); // Each 100 byte buffer takes 3 rounds to consume
}
}

View File

@ -0,0 +1,28 @@
package io.nosqlbench.virtdata.userlibs.streams.pojos;
import io.nosqlbench.virtdata.userlibs.streams.fillers.ByteBufferSource;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
public class ByteBufferFillableTest {
@Test
public void testBytesFillableFromLargeBuffers() {
ByteBufferObject a = new ByteBufferObject(537);
ByteBufferSource byteBuffers = new ByteBufferSource(0, Long.MAX_VALUE, 1024 * 1024);
a.fill(byteBuffers);
assertThat(a.getBuffer().capacity()).isEqualTo(537);
assertThat(a.getBuffer().position()).isEqualTo(0);
}
@Test
public void testBytesFillableFromSmallBuffers() {
ByteBufferObject a = new ByteBufferObject(537);
ByteBufferSource byteBuffers = new ByteBufferSource(0, Long.MAX_VALUE, 37);
a.fill(byteBuffers);
assertThat(a.getBuffer().capacity()).isEqualTo(537);
assertThat(a.getBuffer().position()).isEqualTo(0);
}
}