basic schema(+vectors), rampup (+vectors), and read support

This commit is contained in:
Jonathan Shook 2024-05-10 19:16:03 -05:00
parent 101e9d93fa
commit 0edc60e9d0
10 changed files with 179 additions and 110 deletions

View File

@ -1,21 +1,5 @@
<!--
~ Copyright (c) 2024 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="astra DAPI rampup" type="JarApplication" folderName="Astra DAPI">
<configuration default="false" name="astra DAPI rampup threads=1" type="JarApplication" folderName="Astra DAPI">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
@ -28,4 +12,4 @@
<option name="ALTERNATIVE_JRE_PATH" value="21" />
<method v="2" />
</configuration>
</component>
</component>

View File

@ -0,0 +1,15 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="astra DAPI read" type="JarApplication" folderName="Astra DAPI">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="astra_kv_dapi default.read threads=1 astraTokenFile=target/token astraApiEndpointFile=target/endpoint -v" />
<option name="WORKING_DIRECTORY" value="$ProjectFileDir$/local/dataapi" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<option name="ALTERNATIVE_JRE_PATH" value="21" />
<method v="2" />
</configuration>
</component>

View File

@ -0,0 +1,15 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="astra DAPI schema collection=baselines" type="JarApplication" folderName="Astra DAPI">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="astra_kv_dapi default.schema collection=baselines astraTokenFile=target/token astraApiEndpointFile=target/endpoint -v" />
<option name="WORKING_DIRECTORY" value="$ProjectFileDir$/local/dataapi" />
<option name="ALTERNATIVE_JRE_PATH_ENABLED" value="true" />
<option name="ALTERNATIVE_JRE_PATH" value="21" />
<method v="2" />
</configuration>
</component>

View File

@ -55,11 +55,13 @@ public class DataApiFindOpDispenser extends DataApiOpDispenser {
private FindOptions getFindOptions(ParsedOp op, long l) {
FindOptions options = new FindOptions();
Sort sort = getSortFromOp(op, l);
float[] vector = getVectorValues(op, l);
if (sort != null) {
options = vector != null ? options.sort(vector, sort) : options.sort(sort);
} else if (vector != null) {
options = options.sort(vector);
if (op.isDefined("vector")) {
float[] vector = getVectorValues(op, l);
if (sort != null) {
options = vector != null ? options.sort(vector, sort) : options.sort(sort);
} else if (vector != null) {
options = options.sort(vector);
}
}
Projection[] projection = getProjectionFromOp(op, l);
if (projection != null) {

View File

@ -24,6 +24,7 @@ import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.function.LongFunction;
public class DataApiInsertOneOpDispenser extends DataApiOpDispenser {
@ -36,12 +37,13 @@ public class DataApiInsertOneOpDispenser extends DataApiOpDispenser {
}
private LongFunction<DataApiInsertOneOp> createOpFunction(ParsedOp op) {
LongFunction<Map> docMapFunc = op.getAsRequiredFunction("document", Map.class);
LongFunction<Document> docFunc = (long m) -> new Document(docMapFunc.apply(m));
return (l) -> {
Document.parse(op.get("document", l));
return new DataApiInsertOneOp(
spaceFunction.apply(l).getDatabase(),
targetFunction.apply(l),
Document.parse(op.get("document", l))
docFunc.apply(l)
);
};
}

View File

@ -25,6 +25,7 @@ import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Map;
import java.util.function.LongFunction;
public class DataApiInsertOneVectorOpDispenser extends DataApiOpDispenser {
@ -37,14 +38,15 @@ public class DataApiInsertOneVectorOpDispenser extends DataApiOpDispenser {
}
private LongFunction<DataApiInsertOneVectorOp> createOpFunction(ParsedOp op) {
LongFunction<Map> docMapFunc = op.getAsRequiredFunction("document", Map.class);
LongFunction<Document> docFunc = (long m) -> new Document(docMapFunc.apply(m));
LongFunction<float[]> vectorF= op.getAsRequiredFunction("vector", float[].class);
return (l) -> {
Document.parse(op.get("document", l));
float[] vector = op.get("vector", l);
return new DataApiInsertOneVectorOp(
spaceFunction.apply(l).getDatabase(),
targetFunction.apply(l),
Document.parse(op.get("document", l)),
vector
docFunc.apply(l),
vectorF.apply(l)
);
};
}

View File

@ -56,8 +56,11 @@ public abstract class DataApiOpDispenser extends BaseOpDispenser<DataApiBaseOp,
}
protected Filter getFilterFromOp(ParsedOp op, long l) {
// TODO: Clarify 'filter' vs 'filters' or whether to support both uniformly
Filter filter = null;
Optional<LongFunction<List>> filterFunction = op.getAsOptionalFunction("filters", List.class);
Optional<LongFunction<List>> filterFunction = op.getAsOptionalFunction("filters", List.class).or(
() -> op.getAsOptionalFunction("filter",List.class)
);
if (filterFunction.isPresent()) {
List<Map<String,Object>> filters = filterFunction.get().apply(l);
List<Filter> andFilterList = new ArrayList<>();
@ -143,7 +146,7 @@ public abstract class DataApiOpDispenser extends BaseOpDispenser<DataApiBaseOp,
} else if (rawVectorValues instanceof List) {
return getVectorValuesList(rawVectorValues);
} else {
throw new RuntimeException("Invalid type specified for values");
throw new RuntimeException("Invalid type specified for values (type: " + rawVectorValues.getClass().getSimpleName() + "), values: " + rawVectorValues.toString());
}
return floatValues;
}

View File

@ -18,10 +18,10 @@ package io.nosqlbench.adapter.dataapi.ops;
import com.datastax.astra.client.Collection;
import com.datastax.astra.client.Database;
import com.datastax.astra.client.model.Document;
import com.datastax.astra.client.model.Filter;
import com.datastax.astra.client.model.FindOneOptions;
import com.datastax.astra.client.model.FindOptions;
import com.datastax.astra.client.model.*;
import java.util.List;
import java.util.stream.StreamSupport;
public class DataApiFindOp extends DataApiBaseOp {
private final Collection<Document> collection;
@ -36,7 +36,10 @@ public class DataApiFindOp extends DataApiBaseOp {
}
@Override
public Object apply(long value) {
return collection.find(filter, options);
public List<Document> apply(long value) {
// TODO: make sure we're traversing result data
FindIterable<Document> documentStream = collection.find(filter, options);
List<Document> documents = StreamSupport.stream(documentStream.spliterator(), false).toList();
return documents;
}
}

View File

@ -3,13 +3,19 @@ min_version: "5.21.0"
description: |
A basic workload that uses the DataStax Data API Client in Java, emulating what
applications would do in the native stack.
TEMPLATE(cardinality,1000)
TEMPLATE(collection,keyvalue)
TEMPLATE(dimensions,1536)
TEMPLATE(similarity,COSINE)
TEMPLATE(keycount,TEMPLATE(cardinality))
TEMPLATE(valuecount,TEMPLATE(cardinality))
scenarios:
default:
schema: run driver=dataapi tags==block:schema threads==1 cycles==UNDEF
rampup: run driver=dataapi tags==block:rampup cycles===TEMPLATE(rampup-cycles,10000000) threads=auto
# main: run driver=http tags==block:"main.*" cycles===TEMPLATE(main-cycles,10000000) threads=auto
rampup: run driver=dataapi tags==block:rampup cycles===TEMPLATE(rampup-cycles,1000) threads=auto
rampup_vector: run driver=dataapi tags==block:rampup_vector cycles===TEMPLATE(rampup-cycles,TEMPLATE(cardinality)) threads=auto
find_key: run driver=dataapi tags==block:find_key cycles===TEMPLATE(main-cycles,1000) threads=auto
# kv_dapi:
# kv_dapi_schema: run driver=http tags==block:schema threads==1 cycles==UNDEF
@ -27,13 +33,16 @@ bindings:
# single host: jsonapi_host=host1
# multiple hosts: jsonapi_host=host1,host2,host3
# multiple weighted hosts: jsonapi_host=host1:3,host2:7
weighted_hosts: WeightedStrings('<<jsonapi_host:<<stargate_host:localhost>>>>')
weighted_hosts: WeightedStrings('TEMPLATE(jsonapi_host,TEMPLATE(stargate_host,localhost))')
seq_key: Mod(TEMPLATE(keycount,50000000000L)); ToString() -> String
seq_value: Hash(); Mod(TEMPLATE(valuecount,50000000000L)); ToString() -> String
rw_key: TEMPLATE(keydist,Uniform(0,50000000000L)); ToString() -> String
rw_value: Hash(); TEMPLATE(valdist,Uniform(0,50000000000L)); ToString() -> String
vector_value: HashedFloatVectors(<<dimensions:1536>>); ToCqlVector()
seq_key: Mod(TEMPLATE(keycount); ToString() -> String
# seq_key: Mod(TEMPLATE(keycount,50000000000L));
seq_value: Hash(); Mod(TEMPLATE(valuecount); ToString() -> String
# rw_key: TEMPLATE(keydist,Uniform(0,50000000000L));
rw_key: TEMPLATE(keydist,Uniform(0,TEMPLATE(keycount))); ToString() -> String
rw_key_num: TEMPLATE(keydist,Uniform(0,TEMPLATE(keycount)));
rw_value: Hash(); TEMPLATE(valdist,Uniform(0,TEMPLATE(valuecount)); ToString() -> String
vector_value: HashedFloatVectors(TEMPLATE(dimensions,1536));
request_id: ToHashedUUID(); ToString();
token: Discard(); Token('TEMPLATE(auth_token)','TEMPLATE(uri,http://localhost:8081/v1/auth)', 'TEMPLATE(uid,cassandra)', 'TEMPLATE(pswd,cassandra)');
@ -57,6 +66,18 @@ blocks:
delete_collection: "TEMPLATE(collection)"
create_collection_op:
create_collection: "TEMPLATE(collection)"
dimensions: TEMPLATE(dimensions)
similarity: TEMPLATE(similarity)
# separate these cases later, when you can recreate the same collection name with/without vector support
# schema_vector:
# ops:
# delete_collection_op:
# delete_collection: "TEMPLATE(collection)"
# create_collection_op:
# create_collection: "TEMPLATE(collection)"
# dimensions: TEMPLATE(dimensions)
# similarity: TEMPLATE(similarity)
rampup:
ops:
@ -65,78 +86,84 @@ blocks:
document:
_id: "{seq_key}"
value: "{seq_value}"
rampup-uuid:
ops:
insert_one_op:
insert-one: "TEMPLATE(collection)"
document:
value: "{seq_value}"
main_read:
# rampup-uuid:
# ops:
# insert_one_op:
# insert-one: "TEMPLATE(collection)"
# document:
# value: "{seq_value}"
find_key:
params:
ratio: 5
ops:
find_op:
find: "TEMPLATE(collection)"
filter:
_id: "{rw_key}"
schema_with_vector:
ops:
delete_collection_op:
delete_collection: "TEMPLATE(collection)"
create_collection_op:
create_collection: "TEMPLATE(collection)"
options:
vector:
size: 1536
rampup_with_vector:
filters:
- conjunction: "and"
operator: "lt"
field: "_id"
value: "{rw_key_num}"
# schema_with_vector:
# ops:
# delete_collection_op:
# delete_collection: "TEMPLATE(collection)"
# create_collection_op:
# create_collection: "TEMPLATE(collection)"
# options:
# vector:
# size: 1536
rampup_vector:
ops:
insert_one_op:
insert_one_vector: "TEMPLATE(collection)"
vector: "{vector_value}"
document:
_id: "{seq_key}"
value: "{seq_value}"
$vector: "{vector_value}"
rampup_with_vector_uuid:
ops:
insert_one_op:
insert_one: "TEMPLATE(collection)"
document:
value: "{seq_value}"
$vector: "{vector_value}"
main_read_with_vector:
ops:
find_op:
find: "TEMPLATE(collection)"
filter:
_id: "{rw_key}"
main_ann_with_vector_limit_20:
params:
ratio: 5
ops:
find_op:
find: "TEMPLATE(collection)"
sort:
$vector: "{vector_value}"
options:
limit: 20
schema_with_text_sai:
ops:
delete_collection_op:
delete_collection: "TEMPLATE(collection)"
create_collection_op:
create_collection: "TEMPLATE(collection)"
rampup_with_text_sai:
ops:
insert_one_op:
insert_one: "TEMPLATE(collection)"
document:
_id: "{seq_key}"
value: "{seq_value}"
main_read_with_text_sai:
params:
ratio: 5
ops:
find_op:
find: "TEMPLATE(collection)"
filter:
value: "{rw_value}"
#
# rampup_with_vector_uuid:
# ops:
# insert_one_op:
# insert_one: "TEMPLATE(collection)"
# document:
# value: "{seq_value}"
# vector: "{vector_value}"
#
# main_read_with_vector:
# ops:
# find_op:
# find: "TEMPLATE(collection)"
# filter:
# _id: "{rw_key}"
#
# main_ann_with_vector_limit_20:
# params:
# ratio: 5
# ops:
# find_op:
# find: "TEMPLATE(collection)"
# sort:
# vector: "{vector_value}"
# options:
# limit: 20
# schema_with_text_sai:
# ops:
# delete_collection_op:
# delete_collection: "TEMPLATE(collection)"
# create_collection_op:
# create_collection: "TEMPLATE(collection)"
# rampup_with_text_sai:
# ops:
# insert_one_op:
# insert_one: "TEMPLATE(collection)"
# document:
# _id: "{seq_key}"
# value: "{seq_value}"
# main_read_with_text_sai:
# params:
# ratio: 5
# ops:
# find_op:
# find: "TEMPLATE(collection)"
# filter:
# value: "{rw_value}"

View File

@ -28,6 +28,22 @@ import static org.assertj.core.api.Assertions.assertThat;
public class ParsedTemplateMapTest {
// @Test
// public void testBindpointTypes() {
// ParsedTemplateMap ptm = new ParsedTemplateMap(
// "name1",
// Map.of(
// "field1","{binding1}",
// "field2", " {binding2}"
// ),
// Map.of(
// "binding1","TestingStringFunc('b1')",
// "binding2","TestingStringFunc('b2')"
// ),
// List.of(Map.of()));
//
// }
@Test
public void testParsedTemplateMap() {
ParsedTemplateMap ptm = new ParsedTemplateMap("name1", Map.of("string1", "string2"), Map.of(), List.of());