update vec workload

This commit is contained in:
Jonathan Shook 2024-05-16 14:29:15 -05:00
parent 093e5c9b93
commit 895d7d6659

View File

@ -1,10 +1,14 @@
min_version: 5.21
description: |
This is a template for live vector search testing.
Key parameters:
trainsize: TEMPLATE(trainsize)
testsize: TEMPLATE(testsize)
source_model: TEMPLATE(other)
schema: Install the schema required to run the test
rampup: Measure how long it takes to load a set of embeddings
search_and_index: Measure how the system responds to queries while it
search_and_verify: Measure how the system responds to queries while it
is indexing recently ingested data.
#? await_index: Pause and wait for the system to complete compactions or index processing
search: Run vector search with a set of default (or overridden) parameters
@ -15,32 +19,157 @@ description: |
Also, aggregates of recall should include total aggregate as well as a moving average.
scenarios:
cassandra:
drop: run tags='block:drop' threads==undef cycles==undef context=cassandra
# nb5 cql-vector2 cassandra.schema host=localhost localdc=datacenter1 dimensions=100
schema: run tags='op=create_.*' threads==undef cycles==undef context=cassandra
# nb5 cql-vector2 cassandra.rampup host=localhost localdc=datacenter1 dimensions=100 trainsize=1000000 dataset=glove-100-angular rate=10000
rampup: run tags='block:rampup' threads=auto cycles=TEMPLATE(trainsize,set-the-trainsize) errors=counter,warn context=cassandra
# nb5 cql-vector2 cassandra.search_and_index testsize=10000 host=localhost localdc=datacenter1 dimensions=100 dataset=glove-100-angular --report-csv-to rmetrics:.*:5s
read_recall: >-
run alias=search_and_index tags='block:search_and_index,optype=select' labels='target:cassandra'
cycles=TEMPLATE(testsize) errors=counter,warn threads=1
default:
# Remove any existing data
drop: >-
run tags='block:drop' threads===1 cycles===UNDEF
errors=count
# Install the schema required to run the test
schema_ks: >-
run tags='block:schema_ks' threads===1 cycles===UNDEF
schema: >-
run tags='block:schema' threads===1 cycles===UNDEF
# Truncate any data before loading
# truncate: run tags='block:truncate' threads===1 cycles===UNDEF
# Load training data, measure how long it takes to load
rampup: >-
run tags='block:rampup' threads=TEMPLATE(rampup_threads,auto)
cycles===TEMPLATE(rampup_cycles,TEMPLATE(trainsize))
errors=count,warn
# Measure how the system responds to queries under a read only workload
search_and_verify: >-
run alias=search_and_verify tags='block:search_and_verify,optype=select'
threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize))
errors=count,warn
verify_recall: >-
run alias=verify_recall tags='block:search_and_verify,optype=select'
threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize))
errors=count,warn
astra_vectors:
drop: run tags='block:drop' tags='block:drop' threads==undef cycles==undef
schema: run tags='block:schema' tags='op=create_.*(table|index)' threads==undef cycles==undef dimensions==TEMPLATE(dimensions,25)
train: run tags='block:rampup' threads=20x cycles=TEMPLATE(trainsize) errors=counter,warn maxtries=2 dimensions==TEMPLATE(dimensions,25)
# search_and_index_unthrottled: >-
# run tags='block:search_and_index,optype=select' labels='target:astra'
# cycles=TEMPLATE(testsize) threads=10 errors=count,retry stride=500 errors=counter
testann: >-
run tags='block:testann' cycles=TEMPLATE(testsize) errors=count,retry maxtries=2 threads=auto
# one activity or two? data leap-frog? or concurrency separate for both?
# await_index: run tags='block:await_index' # This would need to exit when a condition is met
# stop_search_and_index: stop search_and_index
# only possible if we have a triggering event to indicated
# live_search: run tags='block:search' labels='target:astra' threads=1 cycles=TEMPLATE(testsize,10000)
search_and_rewrite: run tags='block:search_and_rewrite' labels='target:astra'
search_and_invalidate: run tags='block:search_and_invalidate' labels='target:astra'
# Remove any existing data
drop: >-
run tags='block:drop' threads===1 cycles===UNDEF
errors=count
# Install the schema required to run the test
schema_ks: >-
run tags='block:schema_ks' threads===1 cycles===UNDEF
schema: >-
run tags='block:schema' threads===1 cycles===UNDEF
# Truncate any data before loading
# truncate: run tags='block:truncate' threads===1 cycles===UNDEF
# Load training data, measure how long it takes to load
rampup: >-
run tags='block:rampup' threads=TEMPLATE(rampup_threads,auto)
cycles===TEMPLATE(rampup_cycles,TEMPLATE(trainsize))
errors=count,warn
# Measure how the system responds to queries under a read only workload
search_and_verify: >-
run alias=search_and_verify tags='block:search_and_verify,optype=select'
threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize))
errors=count,warn
verify_recall: >-
run alias=verify_recall tags='block:search_and_verify,optype=select'
threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize))
errors=count,warn
astra_vectors_with_source_model:
# Remove any existing data
drop: >-
run tags='block:drop' threads===1 cycles===UNDEF
errors=count
# Install the schema required to run the test
schema: >-
run tags='block:schema_with_source_model' threads===1 cycles===UNDEF
# Truncate any data before loading
# truncate: run tags='block:truncate' threads===1 cycles===UNDEF
# Load training data, measure how long it takes to load
rampup: >-
run tags='block:rampup' threads=TEMPLATE(rampup_threads,auto)
cycles===TEMPLATE(rampup_cycles,TEMPLATE(trainsize))
errors=count,warn
# Measure how the system responds to queries under a read only workload
search_and_verify: >-
run alias=search_and_verify tags='block:search_and_verify,optype=select'
threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize))
errors=count,warn
verify_recall: >-
run alias=verify_recall tags='block:search_and_verify,optype=select'
threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize))
errors=count,warn
astra_vectors_mixed_workload:
# Measure how the system responds to queries while
# it is indexing recently ingested data
search_and_verify: >-
run alias=search_and_verify tags='block:search_and_verify'
cycles===TEMPLATE(search_cycles) errors=count,retry stride=100 striderate=7.50
errors=counter threads=500
# search_and_rewrite: run tags='block:search_and_rewrite'
# search_and_invalidate: run tags='block:search_and_invalidate'
optimize:
# Remove any existing data
drop: >-
run tags='block:drop' threads===1 cycles===UNDEF
errors=count
# Install the schema required to run the test
schema: >-
run tags='block:schema' threads===1 cycles===UNDEF
# Load training data, measure how long it takes to load
rampup: >-
run tags='block:rampup' threads=TEMPLATE(rampup_threads,auto)
cycles===TEMPLATE(rampup_cycles,TEMPLATE(trainsize))
errors=count,warn
# Start the read only vectory query workload
search_and_verify: >-
start alias=search_and_verify tags='block:search_and_verify,optype=select'
threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize))
errors=count,warn
# Find the optimal rate for the search workload
findmax: >-
findmax activity=search_and_verify
base_value=200
step_value=50
min_frames=10
optimization_type=rate
# Optimize the search workload
optimo: >-
optimo activity=search_and_verify
startrate=${findmax.rate}
sample_time_ms=1000
# Retest the search workload with the optimized rate and thread count
retest: >-
reset activity=search_and_verify
threads==${optimo.threads}
rate==${optimo.rate}
params:
driver: cqld4
@ -48,43 +177,70 @@ params:
bindings:
id: ToString()
test_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/test"); ToCqlVector();
relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(datafile).hdf5", "/neighbors")
distance_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/distance")
train_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train"); ToCqlVector();
test_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/test"); ToCqlVector();
relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(dataset).hdf5", "/neighbors")
distance_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/distances")
train_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/train"); ToCqlVector();
synthetic_vectors: HashedFloatVectors(TEMPLATE(dimensions));
blocks:
drop:
params:
cl: TEMPLATE(cl,LOCAL_QUORUM)
prepared: false
timeout: 600
ops:
drop_index:
raw: |
DROP INDEX IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors);
drop_table:
raw: |
DROP TABLE IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors);
drop_index: |
DROP INDEX IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)_value_idx;
drop_table: |
DROP TABLE IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors);
truncate:
params:
cl: TEMPLATE(cl,LOCAL_QUORUM)
prepared: false
timeout: 600
ops:
truncate_table: |
truncate TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors);
schema_ks:
params:
cl: TEMPLATE(cl,LOCAL_QUORUM)
prepared: false
ops:
create_keyspace: |
create keyspace if not exists TEMPLATE(keyspace,baselines)
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 'TEMPLATE(rf:1)'}
AND durable_writes = true;
schema:
params:
cl: TEMPLATE(cl,LOCAL_QUORUM)
prepared: false
ops:
create_keyspace:
raw: |
CREATE KEYSPACE IF NOT EXISTS TEMPLATE(keyspace,baselines)
WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
create_table:
raw: |
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (
key TEXT,
value vector<float,TEMPLATE(dimensions,set-the-dimensions-template-var)>,
PRIMARY KEY (key)
);
create_sai_index:
raw: |
CREATE CUSTOM INDEX IF NOT EXISTS ON TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (value) USING 'StorageAttachedIndex'
WITH OPTIONS = {'similarity_function' : 'TEMPLATE(similarity_function,cosine)'};
# WITH OPTIONS = {'maximum_node_connections' : TEMPLATE(M,16), 'construction_beam_width' : TEMPLATE(ef,100), 'similarity_function' : 'TEMPLATE(similarity_function,dot_product)'};
create_table: |
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (
key TEXT,
value vector<float,TEMPLATE(dimensions)>,
PRIMARY KEY (key)
);
create_sai_index: |
CREATE CUSTOM INDEX IF NOT EXISTS ON TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (value) USING 'StorageAttachedIndex'
WITH OPTIONS = {'similarity_function' : 'TEMPLATE(similarity_function,cosine)'};
schema_with_source_model:
params:
cl: TEMPLATE(cl,LOCAL_QUORUM)
prepared: false
ops:
create_table: |
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (
key TEXT,
value vector<float,TEMPLATE(dimensions)>,
PRIMARY KEY (key)
);
create_sai_index: |
CREATE CUSTOM INDEX IF NOT EXISTS ON TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (value) USING 'StorageAttachedIndex'
WITH OPTIONS = {'similarity_function' : 'TEMPLATE(similarity_function,cosine)', 'source_model' : 'TEMPLATE(source_model,other)'};
rampup:
params:
cl: TEMPLATE(write_cl,LOCAL_QUORUM)
@ -93,9 +249,7 @@ blocks:
insert: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)
(key, value) VALUES ({id},{train_floatlist});
# await_index:
# ops:
testann:
search_and_verify:
ops:
select_ann_limit_TEMPLATE(k,100):
prepared: |
@ -105,14 +259,14 @@ blocks:
optype: select
verifier-init: |
k=TEMPLATE(k,100)
relevancy= new io.nosqlbench.api.engine.metrics.wrappers.RelevancyMeasures(_parsed_op);
relevancy=new io.nosqlbench.nb.api.engine.metrics.wrappers.RelevancyMeasures(_parsed_op)
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k));
verifier: |
actual_indices=io.nosqlbench.engine.extensions.vectormath.CqlUtils.cqlStringColumnToIntArray("key",result);
actual_indices=cql_utils.cqlStringColumnToIntArray("key",result);
relevancy.accept({relevant_indices},actual_indices);
return true;
insert_rewrite: