diff --git a/.run/qdrant_upsert_points_glove_25.run.xml b/.run/qdrant_upsert_points_glove_25.run.xml new file mode 100644 index 000000000..89e43bde9 --- /dev/null +++ b/.run/qdrant_upsert_points_glove_25.run.xml @@ -0,0 +1,14 @@ + + + + + + \ No newline at end of file diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/QdrantOpMapper.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/QdrantOpMapper.java index 39c2dbe49..ca13372ce 100644 --- a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/QdrantOpMapper.java +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/QdrantOpMapper.java @@ -61,6 +61,7 @@ public class QdrantOpMapper implements OpMapper> { case create_payload_index -> new QdrantCreatePayloadIndexOpDispenser(adapter, op, typeAndTarget.targetFunction); case search_points -> new QdrantSearchPointsOpDispenser(adapter, op, typeAndTarget.targetFunction); + case upsert_points -> new QdrantUpsertPointsOpDispenser(adapter, op, typeAndTarget.targetFunction); // default -> throw new RuntimeException("Unrecognized op type '" + typeAndTarget.enumId.name() + "' while " + // "mapping parsed op " + op); }; diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantCreatePayloadIndexOpDispenser.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantCreatePayloadIndexOpDispenser.java index d77eabe9c..75406f86c 100644 --- a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantCreatePayloadIndexOpDispenser.java +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantCreatePayloadIndexOpDispenser.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024 nosqlbench + * Copyright (c) 2020-2024 nosqlbench * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantSearchPointsOpDispenser.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantSearchPointsOpDispenser.java index 4910d8f31..05762e4a6 100644 --- a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantSearchPointsOpDispenser.java +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantSearchPointsOpDispenser.java @@ -42,7 +42,10 @@ public class QdrantSearchPointsOpDispenser extends QdrantBaseOpDispenser getParamFunc(LongFunction clientF, ParsedOp op, LongFunction targetF) { + public LongFunction getParamFunc( + LongFunction clientF, + ParsedOp op, + LongFunction targetF) { LongFunction ebF = l -> SearchPoints.newBuilder().setCollectionName(targetF.apply(l)); @@ -55,7 +58,7 @@ public class QdrantSearchPointsOpDispenser extends QdrantBaseOpDispenser b.setWithVectors(WithVectorsSelector.newBuilder().setEnable(wp).build())); ebF = op.enhanceFuncOptionally(ebF, "read_consistency", Number.class, (SearchPoints.Builder b, Number rc) -> b.setReadConsistency( - ReadConsistency.newBuilder().setType(ReadConsistencyType.valueOf(rc.intValue())).build())); + ReadConsistency.newBuilder().setType(ReadConsistencyType.forNumber(rc.intValue())).build())); // ebF = op.enhanceFunc(ebF, List.of("vector_vector", "vectors"), List.class, // (SearchPoints.Builder b, List vec) -> b.addAllVector(vec)); diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantUpsertPointsOpDispenser.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantUpsertPointsOpDispenser.java new file mode 100644 index 000000000..7b67d4f49 --- /dev/null +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/opdispensers/QdrantUpsertPointsOpDispenser.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2020-2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.qdrant.opdispensers; + +import io.nosqlbench.adapter.qdrant.QdrantDriverAdapter; +import io.nosqlbench.adapter.qdrant.ops.QdrantBaseOp; +import io.nosqlbench.adapter.qdrant.ops.QdrantUpsertPointsOp; +import io.nosqlbench.adapters.api.activityimpl.OpDispenser; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import io.qdrant.client.QdrantClient; +import io.qdrant.client.ValueFactory; +import io.qdrant.client.VectorFactory; +import io.qdrant.client.VectorsFactory; +import io.qdrant.client.grpc.Collections; +import io.qdrant.client.grpc.JsonWithInt.ListValue; +import io.qdrant.client.grpc.JsonWithInt.NullValue; +import io.qdrant.client.grpc.JsonWithInt.Struct; +import io.qdrant.client.grpc.JsonWithInt.Value; +import io.qdrant.client.grpc.Points.*; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.LongFunction; + +public class QdrantUpsertPointsOpDispenser extends QdrantBaseOpDispenser { + private static final Logger logger = LogManager.getLogger(QdrantUpsertPointsOpDispenser.class); + + /** + * Create a new {@link QdrantUpsertPointsOpDispenser} implementing the {@link OpDispenser} interface. + * @param adapter + * @param op + * @param targetFunction + * @see Upsert Points + */ + public QdrantUpsertPointsOpDispenser(QdrantDriverAdapter adapter, ParsedOp op, LongFunction targetFunction) { + super(adapter, op, targetFunction); + } + + @Override + public LongFunction getParamFunc( + LongFunction clientF, + ParsedOp op, + LongFunction targetF + ) { + LongFunction ebF = + l -> UpsertPoints.newBuilder().setCollectionName(targetF.apply(l)); + + // set wait and ordering query params + ebF = op.enhanceFuncOptionally(ebF, "wait", Boolean.class, UpsertPoints.Builder::setWait); + WriteOrdering.Builder writeOrdering = WriteOrdering.newBuilder(); + op.getOptionalStaticValue("ordering", Number.class) + .ifPresent((Number ordering) -> { + writeOrdering.setType(WriteOrderingType.forNumber(ordering.intValue())); + }); + final LongFunction orderingF = ebF; + ebF = l -> orderingF.apply(l).setOrdering(writeOrdering); + + // request body begins here + ShardKeySelector.Builder shardKeySelector = ShardKeySelector.newBuilder(); + op.getOptionalStaticValue("shard_key", Number.class) + .ifPresent((Number value) -> { + shardKeySelector.setShardKeys(0, Collections.ShardKey.newBuilder().setNumber(value.longValue())); + }); + + List allPoints = buildPointsStructWithNamedVectors(op); + final LongFunction pointsOfNamedVectorsF = ebF; + ebF = l -> pointsOfNamedVectorsF.apply(l).addAllPoints(allPoints); + + final LongFunction lastF = ebF; + return l -> lastF.apply(l).build(); + } + + private List buildPointsStructWithNamedVectors(ParsedOp op) { + List allPoints = new ArrayList<>(); + PointStruct.Builder pointBuilder = PointStruct.newBuilder(); + + PointId.Builder pointId = PointId.newBuilder(); + // id is mandatory + Object idObject = op.getAsRequiredFunction("id", Object.class).apply(0L); + if (idObject instanceof Number) { + pointId.setNum(((Number) idObject).longValue()); + } else if (idObject instanceof String) { + pointId.setUuid((String) idObject); + } + pointBuilder.setId(pointId); + + if (op.isDefined("payload")) { + LongFunction payloadMapF = op.getAsRequiredFunction("payload", Map.class); + Map payloadMapData = new HashMap<>(); + payloadMapF.apply(0L).forEach((pKey, pVal) -> { + if(pVal instanceof Boolean) { + payloadMapData.put((String) pKey, ValueFactory.value((Boolean) pVal)); + } else if(pVal instanceof Double) { + payloadMapData.put((String) pKey, ValueFactory.value((Double) pVal)); + } else if(pVal instanceof Integer) { + payloadMapData.put((String) pKey, ValueFactory.value((Integer) pVal)); + } else if(pVal instanceof String) { + payloadMapData.put((String) pKey, ValueFactory.value((String) pVal)); + } else if(pVal instanceof ListValue) { + payloadMapData.put((String) pKey, ValueFactory.list((List) pVal)); + } else if(pVal instanceof NullValue) { + payloadMapData.put((String) pKey, ValueFactory.nullValue()); + } else if(pVal instanceof Struct) { + payloadMapData.put((String) pKey, Value.newBuilder().setStructValue((Struct) pVal).build()); + } else { + logger.warn("Unknown payload type passed." + + " Only https://qdrant.tech/documentation/concepts/payload/#payload-types are supported." + + " {} will be inored.", pVal.toString()); + } + }); + pointBuilder.putAllPayload(payloadMapData); + } + + LongFunction namedVectorMapF = op.getAsRequiredFunction("vector", Map.class); + Map namedVectorMapData = new HashMap<>(); + List sparseVectors = new ArrayList<>(); + List sparseIndices = new ArrayList<>(); + namedVectorMapF.apply(0L).forEach((nvKey, nvVal) -> { + if (nvVal instanceof Map) { + // we deal with named sparse vectors here + Map namedSparseVectorsMap = (Map) nvVal; + if (namedSparseVectorsMap.containsKey("indices") && namedSparseVectorsMap.containsKey("values")) { + sparseVectors.addAll((List) namedSparseVectorsMap.get("values")); + sparseIndices.addAll((List) namedSparseVectorsMap.get("indices")); + } + namedVectorMapData.put((String) nvKey, VectorFactory.vector(sparseVectors, sparseIndices)); + } else { + // Deal with regular named dense vectors here + namedVectorMapData.put((String) nvKey, VectorFactory.vector((List) nvVal)); + } + }); + pointBuilder.setVectors(VectorsFactory.namedVectors(namedVectorMapData)); + allPoints.add(pointBuilder.build()); + + return allPoints; + } + + /** + * Create a new {@link QdrantUpsertPointsOp} implementing the {@link QdrantBaseOp} interface. + * @see Upsert Points + */ + @Override + public LongFunction> createOpFunc( + LongFunction paramF, + LongFunction clientF, + ParsedOp op, + LongFunction targetF + ) { + return l -> new QdrantUpsertPointsOp(clientF.apply(l), paramF.apply(l)); + } +} diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantCreateCollectionOp.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantCreateCollectionOp.java index 4a0198a57..9c47ce0bf 100644 --- a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantCreateCollectionOp.java +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantCreateCollectionOp.java @@ -16,12 +16,13 @@ package io.nosqlbench.adapter.qdrant.ops; -import com.google.common.util.concurrent.ListenableFuture; import io.nosqlbench.adapters.api.templating.ParsedOp; import io.qdrant.client.QdrantClient; import io.qdrant.client.grpc.Collections.CollectionOperationResponse; import io.qdrant.client.grpc.Collections.CreateCollection; +import java.util.concurrent.ExecutionException; + public class QdrantCreateCollectionOp extends QdrantBaseOp { /** * Create a new {@link ParsedOp} encapsulating a call to the Qdrant create collection method. @@ -35,19 +36,12 @@ public class QdrantCreateCollectionOp extends QdrantBaseOp { @Override public Object applyOp(long value) { -// ListenableFuture response = client.createCollectionAsync( -// CreateCollection.newBuilder() -// .setCollectionName("test") -// .setVectorsConfig(VectorsConfig.newBuilder() -// .setParams( -// VectorParams.newBuilder() -// .setDistance(Distance.Cosine) -// .setSize(25) -// .build() -// ).build() -// ).build() -// ); - ListenableFuture response = client.createCollectionAsync(request); + CollectionOperationResponse response = null; + try { + response = client.createCollectionAsync(request).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } return response; } } diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantDeleteCollectionOp.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantDeleteCollectionOp.java index 9613b4723..2b56cbacc 100644 --- a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantDeleteCollectionOp.java +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantDeleteCollectionOp.java @@ -16,11 +16,12 @@ package io.nosqlbench.adapter.qdrant.ops; -import com.google.common.util.concurrent.ListenableFuture; import io.qdrant.client.QdrantClient; import io.qdrant.client.grpc.Collections.CollectionOperationResponse; import io.qdrant.client.grpc.Collections.DeleteCollection; +import java.util.concurrent.ExecutionException; + public class QdrantDeleteCollectionOp extends QdrantBaseOp { public QdrantDeleteCollectionOp(QdrantClient client, DeleteCollection request) { super(client, request); @@ -28,7 +29,12 @@ public class QdrantDeleteCollectionOp extends QdrantBaseOp { @Override public Object applyOp(long value) { - ListenableFuture response = client.deleteCollectionAsync(request.getCollectionName()); + CollectionOperationResponse response = null; + try { + response = client.deleteCollectionAsync(request.getCollectionName()).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } return response; } diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantPayloadIndexOp.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantPayloadIndexOp.java index 27b148797..c9a7e8da8 100644 --- a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantPayloadIndexOp.java +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantPayloadIndexOp.java @@ -26,6 +26,7 @@ public class QdrantPayloadIndexOp extends QdrantBaseOp { @Override public Object applyOp(long value) { + //client.createPayloadIndexAsync(PayloadIndexParams.get); return null; } } diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantSearchPointsOp.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantSearchPointsOp.java index 239fc949f..5863b4a7e 100644 --- a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantSearchPointsOp.java +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantSearchPointsOp.java @@ -16,12 +16,12 @@ package io.nosqlbench.adapter.qdrant.ops; -import com.google.common.util.concurrent.ListenableFuture; import io.qdrant.client.QdrantClient; import io.qdrant.client.grpc.Points.ScoredPoint; import io.qdrant.client.grpc.Points.SearchPoints; import java.util.List; +import java.util.concurrent.ExecutionException; public class QdrantSearchPointsOp extends QdrantBaseOp { public QdrantSearchPointsOp(QdrantClient client, SearchPoints request) { @@ -30,8 +30,12 @@ public class QdrantSearchPointsOp extends QdrantBaseOp { @Override public Object applyOp(long value) { - ListenableFuture> result = - client.searchAsync(request); + List result = null; + try { + result = client.searchAsync(request).get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } return result; } } diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantUpsertPointsOp.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantUpsertPointsOp.java new file mode 100644 index 000000000..010d149eb --- /dev/null +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/ops/QdrantUpsertPointsOp.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020-2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.qdrant.ops; + +import io.qdrant.client.QdrantClient; +import io.qdrant.client.grpc.Points.UpdateResult; +import io.qdrant.client.grpc.Points.UpsertPoints; + +import java.util.concurrent.ExecutionException; + +public class QdrantUpsertPointsOp extends QdrantBaseOp { + public QdrantUpsertPointsOp(QdrantClient client, UpsertPoints request) { + super(client, request); + } + + @Override + public Object applyOp(long value) { + UpdateResult response = null; + String responseStatus; + long responseOperationId; + try { + response = client.upsertAsync(request).get(); + responseStatus = response.getStatus().toString(); + responseOperationId = response.getOperationId(); + switch(response.getStatus()) { + case Completed, Acknowledged -> logger.trace("Upsert points finished successfully." + + " [Status ({}) for Operation id ({})]", responseStatus, responseOperationId); + case UnknownUpdateStatus, ClockRejected -> logger.error("Upsert points failed with status '{}'" + + " for operation id '{}'", responseStatus, responseOperationId); + default -> logger.error("Unknown status '{}' for operation id '{}'", responseStatus, responseOperationId); + } + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + return response; + } +} diff --git a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/types/QdrantOpType.java b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/types/QdrantOpType.java index be1dd52b4..61da130c2 100644 --- a/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/types/QdrantOpType.java +++ b/nb-adapters/adapter-qdrant/src/main/java/io/nosqlbench/adapter/qdrant/types/QdrantOpType.java @@ -22,4 +22,7 @@ public enum QdrantOpType { create_payload_index, // https://qdrant.github.io/qdrant/redoc/index.html#tag/points/operation/search_points search_points, + // https://qdrant.tech/documentation/concepts/points/ + // https://qdrant.github.io/qdrant/redoc/index.html#tag/points/operation/upsert_points + upsert_points, } diff --git a/nb-adapters/adapter-qdrant/src/main/resources/activities/qdrant_vectors_live.yaml b/nb-adapters/adapter-qdrant/src/main/resources/activities/qdrant_vectors_live.yaml index 865d4f664..a1acfb003 100644 --- a/nb-adapters/adapter-qdrant/src/main/resources/activities/qdrant_vectors_live.yaml +++ b/nb-adapters/adapter-qdrant/src/main/resources/activities/qdrant_vectors_live.yaml @@ -23,10 +23,15 @@ scenarios: errors===stop cycles===UNDEF threads===UNDEF uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file) + rampup: >- + run tags==block:rampup + errors===warn,counter + cycles===TEMPLATE(train_cycles,TEMPLATE(trainsize,1000)) threads===TEMPLATE(train_threads,AUTO) + uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file) search_points: >- run tags==block:search_points - errors===stop - cycles===TEMPLATE(testann_cycles,1000) threads===TEMPLATE(testann_threads,AUTO) + errors===warn,counter + cycles===TEMPLATE(testann_cycles,TEMPLATE(testsize,1000)) threads===TEMPLATE(testann_threads,AUTO) uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file) params: @@ -34,6 +39,7 @@ params: instrument: true bindings: + id_val: Identity(); row_key: ToString() row_key_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),long->ToString()); # filetype=hdf5 for TEMPLATE(filetype,hdf5) @@ -48,6 +54,8 @@ bindings: distance_floatlist_fvec: FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(testsize)_distances_count.fvec",TEMPLATE(dimensions),0); train_floatlist_fvec: FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(trainsize)_base_vectors.fvec",TEMPLATE(dimensions),0); train_floatlist_fvec_batch: Mul(TEMPLATE(batch_size,10)L); ListSizedStepped(TEMPLATE(batch_size),FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(trainsize)_base_vectors.fvec",TEMPLATE(dimensions),0)); + test_2d_fl_vecs: HashedFloatVectors(2) + list_sized: ListSized(HashaRange(2,2), Identity(), Identity()) blocks: delete_collection: @@ -108,6 +116,27 @@ blocks: # full_scan_threshold: 100 # on_disk: true + rampup: + ops: + upsert_points_op: + upsert_points: "TEMPLATE(collection)" + wait: TEMPLATE(upsert_point_wait,true) + # https://github.com/qdrant/qdrant/blob/v1.9.0/lib/api/src/grpc/proto/points.proto#L11-L15 + # 0 - Weak, 1 - Medium, 2 - Strong + ordering: TEMPLATE(upsert_point_ordering,1) + #shard_key: "{row_key}" + id: "{id_val}" + payload: + key: "{row_key}" + vector: + # For dense vectors, use the below format + value: "{train_floatlist_TEMPLATE(filetype)}" + # For sparse vectors, use the below format + #value_sv: + # indices: your array of numbers + # values: your array of floats + + search_points: ops: search_points_op: