From 895d7d665943dc188c24c82ee52de0761942b3bc Mon Sep 17 00:00:00 2001 From: Jonathan Shook Date: Thu, 16 May 2024 14:29:15 -0500 Subject: [PATCH] update vec workload --- .../activities/baselinesv2/cql_vector2.yaml | 268 ++++++++++++++---- 1 file changed, 211 insertions(+), 57 deletions(-) diff --git a/nb-adapters/adapter-cqld4/src/main/resources/activities/baselinesv2/cql_vector2.yaml b/nb-adapters/adapter-cqld4/src/main/resources/activities/baselinesv2/cql_vector2.yaml index 53259b2f5..28b4a419f 100644 --- a/nb-adapters/adapter-cqld4/src/main/resources/activities/baselinesv2/cql_vector2.yaml +++ b/nb-adapters/adapter-cqld4/src/main/resources/activities/baselinesv2/cql_vector2.yaml @@ -1,10 +1,14 @@ min_version: 5.21 description: | This is a template for live vector search testing. + Key parameters: + trainsize: TEMPLATE(trainsize) + testsize: TEMPLATE(testsize) + source_model: TEMPLATE(other) schema: Install the schema required to run the test rampup: Measure how long it takes to load a set of embeddings - search_and_index: Measure how the system responds to queries while it + search_and_verify: Measure how the system responds to queries while it is indexing recently ingested data. #? await_index: Pause and wait for the system to complete compactions or index processing search: Run vector search with a set of default (or overridden) parameters @@ -15,32 +19,157 @@ description: | Also, aggregates of recall should include total aggregate as well as a moving average. scenarios: - cassandra: - drop: run tags='block:drop' threads==undef cycles==undef context=cassandra - # nb5 cql-vector2 cassandra.schema host=localhost localdc=datacenter1 dimensions=100 - schema: run tags='op=create_.*' threads==undef cycles==undef context=cassandra - # nb5 cql-vector2 cassandra.rampup host=localhost localdc=datacenter1 dimensions=100 trainsize=1000000 dataset=glove-100-angular rate=10000 - rampup: run tags='block:rampup' threads=auto cycles=TEMPLATE(trainsize,set-the-trainsize) errors=counter,warn context=cassandra - # nb5 cql-vector2 cassandra.search_and_index testsize=10000 host=localhost localdc=datacenter1 dimensions=100 dataset=glove-100-angular --report-csv-to rmetrics:.*:5s - read_recall: >- - run alias=search_and_index tags='block:search_and_index,optype=select' labels='target:cassandra' - cycles=TEMPLATE(testsize) errors=counter,warn threads=1 + + default: + # Remove any existing data + drop: >- + run tags='block:drop' threads===1 cycles===UNDEF + errors=count + + # Install the schema required to run the test + schema_ks: >- + run tags='block:schema_ks' threads===1 cycles===UNDEF + schema: >- + run tags='block:schema' threads===1 cycles===UNDEF + + # Truncate any data before loading + # truncate: run tags='block:truncate' threads===1 cycles===UNDEF + + # Load training data, measure how long it takes to load + rampup: >- + run tags='block:rampup' threads=TEMPLATE(rampup_threads,auto) + cycles===TEMPLATE(rampup_cycles,TEMPLATE(trainsize)) + errors=count,warn + + # Measure how the system responds to queries under a read only workload + search_and_verify: >- + run alias=search_and_verify tags='block:search_and_verify,optype=select' + threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize)) + errors=count,warn + + verify_recall: >- + run alias=verify_recall tags='block:search_and_verify,optype=select' + threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize)) + errors=count,warn + + + astra_vectors: - drop: run tags='block:drop' tags='block:drop' threads==undef cycles==undef - schema: run tags='block:schema' tags='op=create_.*(table|index)' threads==undef cycles==undef dimensions==TEMPLATE(dimensions,25) - train: run tags='block:rampup' threads=20x cycles=TEMPLATE(trainsize) errors=counter,warn maxtries=2 dimensions==TEMPLATE(dimensions,25) -# search_and_index_unthrottled: >- -# run tags='block:search_and_index,optype=select' labels='target:astra' -# cycles=TEMPLATE(testsize) threads=10 errors=count,retry stride=500 errors=counter - testann: >- - run tags='block:testann' cycles=TEMPLATE(testsize) errors=count,retry maxtries=2 threads=auto - # one activity or two? data leap-frog? or concurrency separate for both? - # await_index: run tags='block:await_index' # This would need to exit when a condition is met - # stop_search_and_index: stop search_and_index - # only possible if we have a triggering event to indicated - # live_search: run tags='block:search' labels='target:astra' threads=1 cycles=TEMPLATE(testsize,10000) - search_and_rewrite: run tags='block:search_and_rewrite' labels='target:astra' - search_and_invalidate: run tags='block:search_and_invalidate' labels='target:astra' + + # Remove any existing data + drop: >- + run tags='block:drop' threads===1 cycles===UNDEF + errors=count + + # Install the schema required to run the test + schema_ks: >- + run tags='block:schema_ks' threads===1 cycles===UNDEF + schema: >- + run tags='block:schema' threads===1 cycles===UNDEF + + # Truncate any data before loading +# truncate: run tags='block:truncate' threads===1 cycles===UNDEF + + # Load training data, measure how long it takes to load + rampup: >- + run tags='block:rampup' threads=TEMPLATE(rampup_threads,auto) + cycles===TEMPLATE(rampup_cycles,TEMPLATE(trainsize)) + errors=count,warn + + # Measure how the system responds to queries under a read only workload + search_and_verify: >- + run alias=search_and_verify tags='block:search_and_verify,optype=select' + threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize)) + errors=count,warn + + verify_recall: >- + run alias=verify_recall tags='block:search_and_verify,optype=select' + threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize)) + errors=count,warn + + astra_vectors_with_source_model: + + # Remove any existing data + drop: >- + run tags='block:drop' threads===1 cycles===UNDEF + errors=count + + # Install the schema required to run the test + schema: >- + run tags='block:schema_with_source_model' threads===1 cycles===UNDEF + + # Truncate any data before loading + # truncate: run tags='block:truncate' threads===1 cycles===UNDEF + + # Load training data, measure how long it takes to load + rampup: >- + run tags='block:rampup' threads=TEMPLATE(rampup_threads,auto) + cycles===TEMPLATE(rampup_cycles,TEMPLATE(trainsize)) + errors=count,warn + + # Measure how the system responds to queries under a read only workload + search_and_verify: >- + run alias=search_and_verify tags='block:search_and_verify,optype=select' + threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize)) + errors=count,warn + + verify_recall: >- + run alias=verify_recall tags='block:search_and_verify,optype=select' + threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize)) + errors=count,warn + + astra_vectors_mixed_workload: + # Measure how the system responds to queries while + # it is indexing recently ingested data + search_and_verify: >- + run alias=search_and_verify tags='block:search_and_verify' + cycles===TEMPLATE(search_cycles) errors=count,retry stride=100 striderate=7.50 + errors=counter threads=500 + + # search_and_rewrite: run tags='block:search_and_rewrite' + # search_and_invalidate: run tags='block:search_and_invalidate' + + optimize: + # Remove any existing data + drop: >- + run tags='block:drop' threads===1 cycles===UNDEF + errors=count + + # Install the schema required to run the test + schema: >- + run tags='block:schema' threads===1 cycles===UNDEF + + # Load training data, measure how long it takes to load + rampup: >- + run tags='block:rampup' threads=TEMPLATE(rampup_threads,auto) + cycles===TEMPLATE(rampup_cycles,TEMPLATE(trainsize)) + errors=count,warn + + # Start the read only vectory query workload + search_and_verify: >- + start alias=search_and_verify tags='block:search_and_verify,optype=select' + threads=TEMPLATE(search_threads,auto) cycles===TEMPLATE(search_cycles,TEMPLATE(testsize)) + errors=count,warn + + # Find the optimal rate for the search workload + findmax: >- + findmax activity=search_and_verify + base_value=200 + step_value=50 + min_frames=10 + optimization_type=rate + + # Optimize the search workload + optimo: >- + optimo activity=search_and_verify + startrate=${findmax.rate} + sample_time_ms=1000 + + # Retest the search workload with the optimized rate and thread count + retest: >- + reset activity=search_and_verify + threads==${optimo.threads} + rate==${optimo.rate} params: driver: cqld4 @@ -48,43 +177,70 @@ params: bindings: id: ToString() - test_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/test"); ToCqlVector(); - relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(datafile).hdf5", "/neighbors") - distance_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/distance") - train_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datafile).hdf5", "/train"); ToCqlVector(); + test_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/test"); ToCqlVector(); + relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(dataset).hdf5", "/neighbors") + distance_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/distances") + train_floatlist: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/train"); ToCqlVector(); synthetic_vectors: HashedFloatVectors(TEMPLATE(dimensions)); blocks: drop: params: cl: TEMPLATE(cl,LOCAL_QUORUM) + prepared: false + timeout: 600 ops: - drop_index: - raw: | - DROP INDEX IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors); - drop_table: - raw: | - DROP TABLE IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors); + drop_index: | + DROP INDEX IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors)_value_idx; + drop_table: | + DROP TABLE IF EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors); + truncate: + params: + cl: TEMPLATE(cl,LOCAL_QUORUM) + prepared: false + timeout: 600 + ops: + truncate_table: | + truncate TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors); + + schema_ks: + params: + cl: TEMPLATE(cl,LOCAL_QUORUM) + prepared: false + ops: + create_keyspace: | + create keyspace if not exists TEMPLATE(keyspace,baselines) + WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 'TEMPLATE(rf:1)'} + AND durable_writes = true; + schema: params: cl: TEMPLATE(cl,LOCAL_QUORUM) + prepared: false ops: - create_keyspace: - raw: | - CREATE KEYSPACE IF NOT EXISTS TEMPLATE(keyspace,baselines) - WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}; - create_table: - raw: | - CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) ( - key TEXT, - value vector, - PRIMARY KEY (key) - ); - create_sai_index: - raw: | - CREATE CUSTOM INDEX IF NOT EXISTS ON TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (value) USING 'StorageAttachedIndex' - WITH OPTIONS = {'similarity_function' : 'TEMPLATE(similarity_function,cosine)'}; -# WITH OPTIONS = {'maximum_node_connections' : TEMPLATE(M,16), 'construction_beam_width' : TEMPLATE(ef,100), 'similarity_function' : 'TEMPLATE(similarity_function,dot_product)'}; + create_table: | + CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) ( + key TEXT, + value vector, + PRIMARY KEY (key) + ); + create_sai_index: | + CREATE CUSTOM INDEX IF NOT EXISTS ON TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (value) USING 'StorageAttachedIndex' + WITH OPTIONS = {'similarity_function' : 'TEMPLATE(similarity_function,cosine)'}; + schema_with_source_model: + params: + cl: TEMPLATE(cl,LOCAL_QUORUM) + prepared: false + ops: + create_table: | + CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) ( + key TEXT, + value vector, + PRIMARY KEY (key) + ); + create_sai_index: | + CREATE CUSTOM INDEX IF NOT EXISTS ON TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (value) USING 'StorageAttachedIndex' + WITH OPTIONS = {'similarity_function' : 'TEMPLATE(similarity_function,cosine)', 'source_model' : 'TEMPLATE(source_model,other)'}; rampup: params: cl: TEMPLATE(write_cl,LOCAL_QUORUM) @@ -93,9 +249,7 @@ blocks: insert: | INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,vectors) (key, value) VALUES ({id},{train_floatlist}); -# await_index: -# ops: - testann: + search_and_verify: ops: select_ann_limit_TEMPLATE(k,100): prepared: | @@ -105,14 +259,14 @@ blocks: optype: select verifier-init: | k=TEMPLATE(k,100) - relevancy= new io.nosqlbench.api.engine.metrics.wrappers.RelevancyMeasures(_parsed_op); + relevancy=new io.nosqlbench.nb.api.engine.metrics.wrappers.RelevancyMeasures(_parsed_op) relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k)); relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k)); relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k)); relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k)); relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k)); verifier: | - actual_indices=io.nosqlbench.engine.extensions.vectormath.CqlUtils.cqlStringColumnToIntArray("key",result); + actual_indices=cql_utils.cqlStringColumnToIntArray("key",result); relevancy.accept({relevant_indices},actual_indices); return true; insert_rewrite: