diff --git a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/MilvusOpMapper.java b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/MilvusOpMapper.java index 9a11f8506..ddc749aae 100644 --- a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/MilvusOpMapper.java +++ b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/MilvusOpMapper.java @@ -33,7 +33,7 @@ public class MilvusOpMapper implements OpMapper { /** * Create a new MilvusOpMapper implementing the {@link OpMapper} interface. * - * @param adapter The associated {@link MilvusDriverAdapter} + * @param adapter The associated {@link MilvusDriverAdapter} */ public MilvusOpMapper(MilvusDriverAdapter adapter) { this.adapter = adapter; @@ -42,8 +42,8 @@ public class MilvusOpMapper implements OpMapper { /** * Given an instance of a {@link ParsedOp} returns the appropriate {@link MilvusOpDispenser} subclass * - * @param op The ParsedOp to be evaluated - * @return The correct MilvusOpDispenser subclass based on the op type + * @param op The {@link ParsedOp} to be evaluated + * @return The correct {@link MilvusOpDispenser} subclass based on the op type */ @Override public OpDispenser apply(ParsedOp op) { @@ -57,10 +57,11 @@ public class MilvusOpMapper implements OpMapper { return switch (typeAndTarget.enumId) { case drop_collection -> new MilvusDropCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction); - case create_collection -> new MilvusCreateCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction); - case create_index -> new MilvusCreateIndexOpDispenser(adapter,op, typeAndTarget.targetFunction); - case drop_index -> new MilvusDropIndexOpDispenser(adapter,op,typeAndTarget.targetFunction); - case insert -> new MilvusInsertOpDispenser(adapter,op,typeAndTarget.targetFunction); + case create_collection -> new MilvusCreateCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction); + case create_index -> new MilvusCreateIndexOpDispenser(adapter, op, typeAndTarget.targetFunction); + case drop_index -> new MilvusDropIndexOpDispenser(adapter, op, typeAndTarget.targetFunction); + case insert -> new MilvusInsertOpDispenser(adapter, op, typeAndTarget.targetFunction); + case search -> new MilvusSearchOpDispenser(adapter, op, typeAndTarget.targetFunction); default -> throw new RuntimeException("Unrecognized op type '" + typeAndTarget.enumId.name() + "' while " + "mapping parsed op " + op); }; diff --git a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/opdispensers/MilvusInsertOpDispenser.java b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/opdispensers/MilvusInsertOpDispenser.java index 06b01767d..2fa072b91 100644 --- a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/opdispensers/MilvusInsertOpDispenser.java +++ b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/opdispensers/MilvusInsertOpDispenser.java @@ -19,12 +19,9 @@ package io.nosqlbench.adapter.milvus.opdispensers; import com.alibaba.fastjson.JSONObject; -import com.google.gson.JsonObject; import io.milvus.client.MilvusServiceClient; import io.milvus.param.dml.InsertParam; -import io.milvus.param.index.DropIndexParam; import io.nosqlbench.adapter.milvus.MilvusDriverAdapter; -import io.nosqlbench.adapter.milvus.ops.MilvusDropIndexOp; import io.nosqlbench.adapter.milvus.ops.MilvusInsertOp; import io.nosqlbench.adapters.api.templating.ParsedOp; import org.apache.logging.log4j.LogManager; @@ -41,12 +38,9 @@ public class MilvusInsertOpDispenser extends MilvusOpDispenser { /** * Create a new MilvusDeleteOpDispenser subclassed from {@link MilvusOpDispenser}. * - * @param adapter - * The associated {@link MilvusDriverAdapter} - * @param op - * The {@link ParsedOp} encapsulating the activity for this cycle - * @param targetFunction - * A LongFunction that returns the specified Milvus Index for this Op + * @param adapter The associated {@link MilvusDriverAdapter} + * @param op The {@link ParsedOp} encapsulating the activity for this cycle + * @param targetFunction A LongFunction that returns the specified Milvus Index for this Op */ public MilvusInsertOpDispenser(MilvusDriverAdapter adapter, ParsedOp op, @@ -80,12 +74,12 @@ public class MilvusInsertOpDispenser extends MilvusOpDispenser { } private LongFunction> createFieldsF(ParsedOp op) { - LongFunction fielddata = op.getAsRequiredFunction("fields", Map.class); + LongFunction fieldDataF = op.getAsRequiredFunction("fields", Map.class); LongFunction> fieldsF = l -> { - Map fieldmap = fielddata.apply(l); + Map fieldmap = fieldDataF.apply(l); List fields = new ArrayList<>(); - fieldmap.forEach((name,value)->{ - fields.add(new InsertParam.Field(name,(List)value)); + fieldmap.forEach((name, value) -> { + fields.add(new InsertParam.Field(name, (List) value)); }); return fields; }; diff --git a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/opdispensers/MilvusSearchOpDispenser.java b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/opdispensers/MilvusSearchOpDispenser.java new file mode 100644 index 000000000..1a86fe6b4 --- /dev/null +++ b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/opdispensers/MilvusSearchOpDispenser.java @@ -0,0 +1,152 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.milvus.opdispensers; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.common.clientenum.ConsistencyLevelEnum; +import io.milvus.param.MetricType; +import io.milvus.param.dml.SearchParam; +import io.nosqlbench.adapter.milvus.MilvusDriverAdapter; +import io.nosqlbench.adapter.milvus.ops.MilvusSearchOp; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.LongFunction; + +public class MilvusSearchOpDispenser extends MilvusOpDispenser { + private static final Logger logger = LogManager.getLogger(MilvusSearchOpDispenser.class); + + /** + * Create a new {@ link MilvusSearchOpDispenser} subclassed from {@link MilvusOpDispenser}. + * + * @param adapter The associated {@link MilvusDriverAdapter} + * @param op The {@link ParsedOp} encapsulating the activity for this cycle + * @param targetFunction A LongFunction that returns the specified Milvus Index for this Op + */ + public MilvusSearchOpDispenser( + MilvusDriverAdapter adapter, + ParsedOp op, + LongFunction targetFunction) { + super(adapter, op, targetFunction); + } + + @Override + public LongFunction createOpFunc( + LongFunction clientF, + ParsedOp op, + LongFunction targetF) { + LongFunction f = + l -> SearchParam.newBuilder().withCollectionName(targetF.apply(l)); + f = op.enhanceFuncOptionally(f, "partition_name", String.class, SearchParam.Builder::addPartitionName); + if(op.isDefined("partition_names", List.class)) { + LongFunction> partitionNamesF = createPartitionNamesF(op); + LongFunction finalF = f; + f = l -> finalF.apply(l).withPartitionNames(partitionNamesF.apply(l)); + f = op.enhanceFuncOptionally(f, "partition_names", List.class, SearchParam.Builder::withPartitionNames); + } + f = op.enhanceFuncOptionally(f, "out_field", String.class, SearchParam.Builder::addOutField); + if(op.isDefined("out_fields", List.class)) { + LongFunction> outFieldsF = createOutFieldsF(op); + LongFunction finalF1 = f; + f = l -> finalF1.apply(l).withOutFields(outFieldsF.apply(l)); + } + + f = op.enhanceEnumOptionally(f, "consistency_level", ConsistencyLevelEnum.class, SearchParam.Builder::withConsistencyLevel); + f = op.enhanceFuncOptionally(f, "expr", String.class, SearchParam.Builder::withExpr); + f = op.enhanceDefaultFunc(f, "top_k", Integer.class, 100, SearchParam.Builder::withTopK); + f = op.enhanceDefaultFunc(f, "metric_type", MetricType.class, MetricType.COSINE, SearchParam.Builder::withMetricType); + f = op.enhanceFuncOptionally(f, "round_decimal", Integer.class, SearchParam.Builder::withRoundDecimal); + f = op.enhanceFuncOptionally(f, "ignore_growing", Boolean.class, SearchParam.Builder::withIgnoreGrowing); + f = op.enhanceFuncOptionally(f, "params", String.class, SearchParam.Builder::withParams); + + f = op.enhanceFunc(f, "vector_field_name", String.class, SearchParam.Builder::withVectorFieldName); + LongFunction> queryVectorsF = createQueryVectorsF(op); + LongFunction finalF2 = f; + f = l -> finalF2.apply(l).withVectors(queryVectorsF.apply(l)); + + LongFunction searchParamsF = f; + LongFunction searchOpF = l -> new MilvusSearchOp(clientF.apply(l), searchParamsF.apply(l).build()); + return searchOpF; + } + + /** + * Prepare the {@code query_vectors} parameter list for the search operation. + * @param op {@link ParsedOp} + * @return {@link LongFunction>} + */ + private LongFunction> createQueryVectorsF(ParsedOp op) { + LongFunction outVectorF = op.getAsRequiredFunction("vectors", Map.class); + LongFunction> outFieldsListF = l -> { + Map fieldmap = outVectorF.apply(l); + //List floatVectorList = new ArrayList(); + //List byteBufferVectorList = new ArrayList<>(); + List finalVectorList = new ArrayList<>(); + fieldmap.forEach((name, value) -> { + // TODO - validate if we really need to do these type checking here or let the DB barf at us if we + // use it otherwise https://milvus.io/api-reference/java/v2.3.x/Query%20and%20Search/search().md +// if(value instanceof Float) { +// floatVectorList.add((Float) value); +// } else { +// byteBufferVectorList.add((ByteBuffer) value); +// } + finalVectorList.add(value); + }); + return finalVectorList; + }; + return outFieldsListF; + } + + /** + * Prepare the {@code out_fields} parameter list for the search operation. + * @param op {@link ParsedOp} + * @return {@link LongFunction>} + */ + private LongFunction> createOutFieldsF(ParsedOp op) { + LongFunction outFieldDataF = op.getAsRequiredFunction("out_fields", Map.class); + LongFunction> outFieldsListF = l -> { + Map fieldmap = outFieldDataF.apply(l); + List fields = new ArrayList<>(); + fieldmap.forEach((name, value) -> { + fields.add(String.valueOf(value)); + }); + return fields; + }; + return outFieldsListF; + } + + /** + * Prepare the {@code partition_names} parameter list for the search operation. + * @param op {@link ParsedOp} + * @return {@link LongFunction>} + */ + private LongFunction> createPartitionNamesF(ParsedOp op) { + LongFunction partitionNamesDataF = op.getAsRequiredFunction("partition_names", Map.class); + LongFunction> partitionNamesListF = l -> { + Map fieldmap = partitionNamesDataF.apply(l); + List fields = new ArrayList<>(); + fieldmap.forEach((name, value) -> { + fields.add(String.valueOf(value)); + }); + return fields; + }; + return partitionNamesListF; + } +} diff --git a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusCreateIndexOp.java b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusCreateIndexOp.java index 2272d9cff..bf6dca1c1 100644 --- a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusCreateIndexOp.java +++ b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusCreateIndexOp.java @@ -19,7 +19,6 @@ package io.nosqlbench.adapter.milvus.ops; import io.milvus.client.MilvusServiceClient; import io.milvus.param.R; import io.milvus.param.RpcStatus; -import io.milvus.param.collection.CreateCollectionParam; import io.milvus.param.index.CreateIndexParam; import io.nosqlbench.adapters.api.templating.ParsedOp; import org.apache.logging.log4j.LogManager; @@ -29,6 +28,12 @@ public class MilvusCreateIndexOp extends MilvusOp { private static final Logger logger = LogManager.getLogger(MilvusCreateIndexOp.class); private final CreateIndexParam request; + /** + * Create a new {@link ParsedOp} encapsulating a call to the Milvus/Zilliz client delete method + * + * @param client The associated {@link MilvusServiceClient} used to communicate with the database + * @param request The {@link CreateIndexParam} built for this operation + */ public MilvusCreateIndexOp(MilvusServiceClient client, CreateIndexParam request) { super(client); this.request = request; diff --git a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusDropIndexOp.java b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusDropIndexOp.java index 24e8b40ce..1415d68a9 100644 --- a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusDropIndexOp.java +++ b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusDropIndexOp.java @@ -17,8 +17,6 @@ package io.nosqlbench.adapter.milvus.ops; import io.milvus.client.MilvusServiceClient; -import io.milvus.grpc.DropIndexRequest; -import io.milvus.param.collection.DropCollectionParam; import io.milvus.param.index.DropIndexParam; import io.nosqlbench.adapters.api.templating.ParsedOp; import org.apache.logging.log4j.LogManager; @@ -32,7 +30,7 @@ public class MilvusDropIndexOp extends MilvusOp { * Create a new {@link ParsedOp} encapsulating a call to the Milvus/Zilliz client delete method * * @param client The associated {@link MilvusServiceClient} used to communicate with the database - * @param request The {@link DropCollectionParam} built for this operation + * @param request The {@link DropIndexParam} built for this operation */ public MilvusDropIndexOp(MilvusServiceClient client, DropIndexParam request) { super(client); diff --git a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusInsertOp.java b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusInsertOp.java index 685dc42f3..4cdf09823 100644 --- a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusInsertOp.java +++ b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusInsertOp.java @@ -19,8 +19,6 @@ package io.nosqlbench.adapter.milvus.ops; import io.milvus.client.MilvusServiceClient; import io.milvus.grpc.MutationResult; import io.milvus.param.R; -import io.milvus.param.RpcStatus; -import io.milvus.param.collection.CreateCollectionParam; import io.milvus.param.dml.InsertParam; import io.nosqlbench.adapters.api.templating.ParsedOp; import org.apache.logging.log4j.LogManager; @@ -34,7 +32,7 @@ public class MilvusInsertOp extends MilvusOp { * Create a new {@link ParsedOp} encapsulating a call to the Milvus/Zilliz client delete method * * @param client The associated {@link MilvusServiceClient} used to communicate with the database - * @param request The {@link CreateCollectionParam} built for this operation + * @param request The {@link InsertParam} built for this operation */ public MilvusInsertOp(MilvusServiceClient client, InsertParam request) { super(client); @@ -43,7 +41,7 @@ public class MilvusInsertOp extends MilvusOp { @Override public R applyOp(long value) { - logger.debug("Milvus/Zilliz create collection request"); + logger.debug("Milvus/Zilliz insert data request"); R response = client.insert(request); return response; } diff --git a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusOpTypes.java b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusOpTypes.java index 7517790a6..b11e44d2d 100644 --- a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusOpTypes.java +++ b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusOpTypes.java @@ -22,10 +22,6 @@ public enum MilvusOpTypes { create_index, drop_index, - insert -// update, -// upsert, -// delete, -// describeindexstats, -// fetch + insert, + search, } diff --git a/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusSearchOp.java b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusSearchOp.java new file mode 100644 index 000000000..571894081 --- /dev/null +++ b/adapter-milvus/src/main/java/io/nosqlbench/adapter/milvus/ops/MilvusSearchOp.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.nosqlbench.adapter.milvus.ops; + +import io.milvus.client.MilvusServiceClient; +import io.milvus.grpc.SearchResults; +import io.milvus.param.R; +import io.milvus.param.dml.SearchParam; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class MilvusSearchOp extends MilvusOp { + + /** + * Create a new {@link ParsedOp} encapsulating a call to the Milvus/Zilliz client delete method + * + * @param client The associated {@link MilvusServiceClient} used to communicate with the database + * @param request The {@link SearchParam} built for this operation + */ + public MilvusSearchOp(MilvusServiceClient client, SearchParam request) { + super(client, request); + } + + @Override + public R applyOp(long value) { + R response = client.search(request); + return response; + } +} diff --git a/adapter-milvus/src/main/resources/activities/milvus.yaml b/adapter-milvus/src/main/resources/activities/milvus.yaml index 911b9b5e3..a43d47b42 100644 --- a/adapter-milvus/src/main/resources/activities/milvus.yaml +++ b/adapter-milvus/src/main/resources/activities/milvus.yaml @@ -64,19 +64,21 @@ blocks: insert_op: insert: collection_name: "TEMPLATE(collection,vector)" - field: + fields: key: {row_key} value: {row_vector} search: ops: # https://milvus.io/api-reference/java/v2.3.x/High-level%20API/search().md + # https://milvus.io/api-reference/java/v2.3.x/Query%20and%20Search/search().md search_op: search: collection_name: "TEMPLATE(collection,vector)" output_fields: - key - value + vector_field_name: "value" vectors: {row_vector} - limit: TEMPLATE(top_k,100) + top_k: TEMPLATE(top_k,100) consistency_level: "TEMPLATE(read_cl,EVENTUALLY)"