Incremental name changes; use of builder; fix in normalize

This commit is contained in:
jeffbanks 2023-06-02 12:18:18 -05:00
parent 387cef4c17
commit b9d5008c5a
11 changed files with 152 additions and 35 deletions

View File

@ -20,8 +20,10 @@ import com.datastax.oss.driver.api.core.data.CqlVector;
import io.nosqlbench.virtdata.api.annotations.Categories;
import io.nosqlbench.virtdata.api.annotations.Category;
import io.nosqlbench.virtdata.api.annotations.ThreadSafeMapper;
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_vector.NormalizeDoubleVectorList;
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_vector.NormalizeFloatVectorList;
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_vector.NormalizeDoubleListVector;
import io.nosqlbench.virtdata.library.basics.shared.from_long.to_vector.NormalizeFloatListVector;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@ -33,29 +35,33 @@ import java.util.function.Function;
*/
@ThreadSafeMapper
@Categories(Category.experimental)
public class NormalizeVector implements Function<com.datastax.oss.driver.api.core.data.CqlVector ,List> {
private final NormalizeDoubleVectorList ndv = new NormalizeDoubleVectorList();
private final NormalizeFloatVectorList nfv = new NormalizeFloatVectorList();
public class NormalizeCqlVector implements Function<com.datastax.oss.driver.api.core.data.CqlVector, com.datastax.oss.driver.api.core.data.CqlVector> {
private final NormalizeDoubleListVector ndv = new NormalizeDoubleListVector();
private final NormalizeFloatListVector nfv = new NormalizeFloatListVector();
private final static Logger logger = LogManager.getLogger(NormalizeCqlVector.class);
@Override
public List apply(CqlVector cqlVector) {
public com.datastax.oss.driver.api.core.data.CqlVector apply(CqlVector cqlVector) {
CqlVector.Builder builder = CqlVector.builder();
Iterable values = cqlVector.getValues();
List<Object> list = new ArrayList<>();
values.forEach(list::add);
if (list.size()==0) {
return List.of();
if (list.isEmpty()) {
builder.add(List.of());
} else if (list.get(0) instanceof Float) {
List<Float> floats = new ArrayList<>();
list.forEach(o -> floats.add((Float)o));
return nfv.apply(floats);
list.forEach(o -> floats.add((Float) o));
builder.add(nfv.apply(floats));
} else if (list.get(0) instanceof Double) {
List<Double> doubles = new ArrayList<>();
list.forEach(o -> doubles.add((Double) o));
return ndv.apply(doubles);
builder.add(ndv.apply(doubles));
} else {
throw new RuntimeException("Only Doubles and Floats are recognized.");
}
return builder.build();
}
}

View File

@ -35,15 +35,10 @@ public class CqlVector implements LongFunction<com.datastax.oss.driver.api.core.
private final LongFunction<List<?>> func;
@Example({"CqlVector(ListSized(2,HashedRange(0.2f, 5.0f)","Create a 2-component vector with the given range of values."})
@Example({"CqlVector(ListSized(2,HashedRange(0.2f, 5.0f)", "Create a 2-component vector with the given range of values."})
public CqlVector(Object func) {
this.func = VirtDataConversions.adaptFunction(func,LongFunction.class, List.class);
this.func = VirtDataConversions.adaptFunction(func, LongFunction.class, List.class);
}
//
// @Example({"CqlVector()","Create a default 5-component vector with unit-interval components."})
// public CqlVector() {
// this(new ListSizedHashed(5, new HashRange(0.0f, 1.0f)));
// }
@Override
public com.datastax.oss.driver.api.core.data.CqlVector apply(long cycle) {

View File

@ -0,0 +1,94 @@
min_version: "5.17.3"
description: |
A workload with a float vector data type.
The CQL Key-Value workload demonstrates the simplest possible schema with payload data where value is of the float vector data type. This is useful for measuring
system capacity most directly in terms of raw operations. As a reference point, it provides some insight around types of
workloads that are constrained around messaging, threading, and tasking, rather than bulk throughput.
During preload, all keys are set with a value. During the main phase of the workload, random keys from the known
population are replaced with new values which never repeat. During the main phase, random partitions are selected for
upsert, with row values never repeating.
scenarios:
default:
schema: run driver=cql tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=cql tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=100
main: run driver=cql tags=='block:main.*' cycles===TEMPLATE(main-cycles,10000000) threads=5
drop-tables:
schema: run driver=cql tags==block:drop-tables threads==1 cycles==UNDEF
truncate: run driver=cql tags==block:truncate-tables cycles===1 threads=1
reads: run driver=cql tags==block:main-read cycles===TEMPLATE(main-cycles,10000000) threads=auto
bindings:
seq_key: Mod(<<keycount:1000000000>>); ToString() -> String
seq_value: Hash(); Mod(<<valuecount:1000000000>>); ToString() -> String
rw_key: <<keydist:Uniform(0,1000000000)->int>>; ToString() -> String
rw_value: Hash(); <<valdist:Uniform(0,1000000000)->int>>; ToString() -> String
vector_value: CqlVector(ListSizedHashed(<<dimensions:5>>,HashRange(0.0f,100.0f)); NormalizeCqlVector();
blocks:
drop-tables:
ops:
drop-table-vectors:
raw: |
DROP TABLE IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors);
schema:
params:
prepared: false
ops:
create-keyspace:
raw: |
CREATE KEYSPACE IF NOT EXISTS TEMPLATE(keyspace,baselines)
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '<<rf:1>>'};
create-table:
raw: |
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (
key TEXT,
value vector<float,<<dimensions:5>>>,
PRIMARY KEY (key)
);
create-sai-index:
raw: |
CREATE CUSTOM INDEX IF NOT EXISTS ON TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (value) USING 'StorageAttachedIndex';
truncate-tables:
params:
prepared: false
ops:
truncate-vectors:
raw: |
TRUNCATE TABLE TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors);
rampup:
params:
cl: TEMPLATE(write_cl,LOCAL_QUORUM)
ops:
rampup-insert:
prepared: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)
(key, value) VALUES ({seq_key},{vector_value});
main-read:
params:
ratio: TEMPLATE(read_ratio,90)
cl: TEMPLATE(read_cl,LOCAL_QUORUM)
instrument: true
ops:
main-select-ann-limit:
prepared: |
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) ORDER BY value ANN OF {vector_value} LIMIT TEMPLATE(select_limit,2);
main-select-pk-ann-limit:
prepared: |
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) WHERE KEY={rw_key} ORDER BY value ANN OF {vector_value} LIMIT TEMPLATE(select_limit,2);
main-write:
params:
ratio: TEMPLATE(write_ratio,10)
cl: TEMPLATE(write_cl,LOCAL_QUORUM)
instrument: true
ops:
main-insert:
prepared: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)
(key, value) VALUES ({rw_key}, {vector_value});

View File

@ -28,7 +28,7 @@ bindings:
seq_value: Hash(); Mod(<<valuecount:1000000000>>); ToString() -> String
rw_key: <<keydist:Uniform(0,1000000000)->int>>; ToString() -> String
rw_value: Hash(); <<valdist:Uniform(0,1000000000)->int>>; ToString() -> String
vector_value: CqlVector(ListSizedHashed(<<dimensions:5>>,HashRange(0.0f,100.0f) -> float)); NormalizeVector();
vector_value: CqlVector(ListSizedHashed(<<dimensions:5>>,HashRange(0.0f,100.0f) -> float)); NormalizeCqlVector();
blocks:
drop-tables:
ops:

View File

@ -15,6 +15,6 @@ bindings:
# create a HOF CqlVector binding
hof_vary_vector: CqlVector(ListSizedHashed(HashRange(3,5)->int,HashRange(0.0f,1.0f)))
# create a normalized vectors of dimension 10
hof_ten_unit: CqlVector(ListSizedHashed(10,HashRange(0.0f,1.0f))); NormalizeVector();
hof_ten_unit: CqlVector(ListSizedHashed(10,HashRange(0.0f,1.0f))); NormalizeCqlVector();

View File

@ -81,7 +81,12 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-statistics-distribution</artifactId>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.15.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>

View File

@ -17,12 +17,16 @@
package io.nosqlbench.virtdata.api.bindings;
import io.nosqlbench.api.errors.BasicError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.lang.reflect.*;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.TypeVariable;
import java.security.InvalidParameterException;
import java.util.*;
import java.util.List;
import java.util.function.*;
import java.util.function.LongFunction;
public class VirtDataConversions {
@ -33,7 +37,7 @@ public class VirtDataConversions {
LongFunction(LongFunction.class, long.class, Object.class),
LongUnaryOperator(java.util.function.LongUnaryOperator.class, long.class, long.class),
IntFunction(java.util.function.IntFunction.class, int.class, Object.class),
IntToDoubleFunction(java.util.function.IntToDoubleFunction.class,int.class,double.class),
IntToDoubleFunction(java.util.function.IntToDoubleFunction.class, int.class, double.class),
IntToLongFunction(java.util.function.IntToLongFunction.class, int.class, long.class),
IntUnaryOperator(java.util.function.IntUnaryOperator.class, int.class, int.class),
DoubleFunction(java.util.function.DoubleFunction.class, double.class, Object.class),
@ -46,6 +50,7 @@ public class VirtDataConversions {
private final Class<?> inputClazz;
private final Class<?> outputClazz;
FuncType(Class<?> functionClazz, Class<?> inputClazz, Class<?> outputClazz) {
this.functionClazz = functionClazz;
this.inputClazz = inputClazz;
@ -63,6 +68,8 @@ public class VirtDataConversions {
}
private static final Logger logger = LogManager.getLogger(VirtDataConversions.class);
public static <F, T> List<T> adaptFunctionList(F[] funcs, Class<T> functionType, Class<Object>... resultSignature) {
List<T> functions = new ArrayList<>();
for (Object func : funcs) {
@ -83,6 +90,7 @@ public class VirtDataConversions {
* @return An instance of T
*/
public static <F, T> T adaptFunction(F func, Class<T> functionType, Class<?>... resultSignature) {
FuncType funcType = FuncType.valueOf(func.getClass());
List<Class<?>> signature = new ArrayList<>();
@ -96,6 +104,7 @@ public class VirtDataConversions {
signature.addAll(fromSignature);
signature.addAll(toSignature);
logger.debug("Adapting function from " + fromSignature + " to " + toSignature);
if (fromSignature.equals(toSignature)) {
return (T) func;
}
@ -108,8 +117,11 @@ public class VirtDataConversions {
Method adapter = null;
Class<?> hostclass = NBFunctionConverter.class;
try {
logger.debug("Looking for adapter method for " + hostclass.getCanonicalName() + " with signature " + signature);
adapter = NBFunctionConverter.class.getMethod("adapt", methodSignature);
} catch (NoSuchMethodException e) {
logger.debug("No adapter method found for " + hostclass.getCanonicalName() + " with signature " + signature);
StringBuilder example = new StringBuilder();
@ -134,6 +146,8 @@ public class VirtDataConversions {
}
logger.debug("Found adapter method for " + hostclass.getCanonicalName() + " with signature " + signature);
FuncType fromType = FuncType.valueOf(func.getClass());
if (fromType.functionClazz.getTypeParameters().length > 0) {
TypeVariable<? extends Class<?>>[] funcParms = func.getClass().getTypeParameters();
@ -148,6 +162,8 @@ public class VirtDataConversions {
T result = null;
try {
logger.debug("Invoking adapter method for " + hostclass.getCanonicalName() + " with signature "
+ signature + " and args " + Arrays.toString(args));
result = (T) adapter.invoke(null, args);
return result;
} catch (IllegalArgumentException e) {
@ -161,9 +177,10 @@ public class VirtDataConversions {
/**
* Slice the incoming object list into a set of functions, based on a grouping interval and an offset.
* @param mod The grouping interval, or modulo to slice the function groups into
*
* @param mod The grouping interval, or modulo to slice the function groups into
* @param offset The offset within the group for the provided function
* @param funcs A list of source objects to convert to functions.
* @param funcs A list of source objects to convert to functions.
* @return
*/
public static <T> List<T> getFunctions(int mod, int offset, Class<? extends T> functionType, Object... funcs) {
@ -171,7 +188,7 @@ public class VirtDataConversions {
// throw new RuntimeException("uneven division of functions, where multiples of " + mod + " are expected.");
// }
List<T> functions = new ArrayList<>();
for (int i = offset; i < funcs.length; i+=mod) {
for (int i = offset; i < funcs.length; i += mod) {
Object func = funcs[i];
T longFunction = VirtDataConversions.adaptFunction(func, functionType, Object.class);
functions.add(longFunction);

View File

@ -29,7 +29,7 @@ import java.util.function.Function;
*/
@ThreadSafeMapper
@Categories(Category.experimental)
public class NormalizeDoubleVectorList implements Function<List<Double>,List<Double>> {
public class NormalizeDoubleListVector implements Function<List<Double>,List<Double>> {
@Override
public List<Double> apply(List<Double> doubles) {
ArrayList<Double> unit = new ArrayList<>(doubles.size());

View File

@ -29,7 +29,7 @@ import java.util.function.Function;
*/
@ThreadSafeMapper
@Categories(Category.experimental)
public class NormalizeFloatVectorList implements Function<List<Float>,List<Float>> {
public class NormalizeFloatListVector implements Function<List<Float>,List<Float>> {
@Override
public List<Float> apply(List<Float> floats) {
ArrayList<Float> unit = new ArrayList<>(floats.size());

View File

@ -29,9 +29,9 @@ import java.util.function.Function;
*/
@ThreadSafeMapper
@Categories(Category.experimental)
public class NormalizeVector implements Function<List,List> {
private final NormalizeDoubleVectorList ndv = new NormalizeDoubleVectorList();
private final NormalizeFloatVectorList nfv = new NormalizeFloatVectorList();
public class NormalizeListVector implements Function<List,List> {
private final NormalizeDoubleListVector ndv = new NormalizeDoubleListVector();
private final NormalizeFloatListVector nfv = new NormalizeFloatListVector();
@Override
public List apply(List list) {
@ -39,7 +39,7 @@ public class NormalizeVector implements Function<List,List> {
return List.of();
} else if (list.get(0) instanceof Float) {
return nfv.apply(list);
} else if (list.get(1) instanceof Double) {
} else if (list.get(0) instanceof Double) {
return ndv.apply(list);
} else {
throw new RuntimeException("Only Doubles and Floats are recognized.");

View File

@ -27,7 +27,7 @@ public class ToNormalizedVectorTest {
@Test
public void testNormalizeBasic() {
NormalizeDoubleVectorList normalize = new NormalizeDoubleVectorList();
NormalizeDoubleListVector normalize = new NormalizeDoubleListVector();
List<Double> normalized = normalize.apply(List.of(1.0d));
for (int i = 0; i < normalized.size(); i++) {
assertThat(normalized.get(i)).isCloseTo(1.0d, Offset.offset(0.00001d));