Implement search op

This commit is contained in:
Madhavan Sridharan 2024-03-18 13:56:16 -04:00
parent b4a5a0177a
commit 7a1f6f3b2e
9 changed files with 226 additions and 36 deletions

View File

@ -33,7 +33,7 @@ public class MilvusOpMapper implements OpMapper<MilvusOp> {
/**
* Create a new MilvusOpMapper implementing the {@link OpMapper} interface.
*
* @param adapter The associated {@link MilvusDriverAdapter}
* @param adapter The associated {@link MilvusDriverAdapter}
*/
public MilvusOpMapper(MilvusDriverAdapter adapter) {
this.adapter = adapter;
@ -42,8 +42,8 @@ public class MilvusOpMapper implements OpMapper<MilvusOp> {
/**
* Given an instance of a {@link ParsedOp} returns the appropriate {@link MilvusOpDispenser} subclass
*
* @param op The ParsedOp to be evaluated
* @return The correct MilvusOpDispenser subclass based on the op type
* @param op The {@link ParsedOp} to be evaluated
* @return The correct {@link MilvusOpDispenser} subclass based on the op type
*/
@Override
public OpDispenser<? extends MilvusOp> apply(ParsedOp op) {
@ -57,10 +57,11 @@ public class MilvusOpMapper implements OpMapper<MilvusOp> {
return switch (typeAndTarget.enumId) {
case drop_collection -> new MilvusDropCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction);
case create_collection -> new MilvusCreateCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction);
case create_index -> new MilvusCreateIndexOpDispenser(adapter,op, typeAndTarget.targetFunction);
case drop_index -> new MilvusDropIndexOpDispenser(adapter,op,typeAndTarget.targetFunction);
case insert -> new MilvusInsertOpDispenser(adapter,op,typeAndTarget.targetFunction);
case create_collection -> new MilvusCreateCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction);
case create_index -> new MilvusCreateIndexOpDispenser(adapter, op, typeAndTarget.targetFunction);
case drop_index -> new MilvusDropIndexOpDispenser(adapter, op, typeAndTarget.targetFunction);
case insert -> new MilvusInsertOpDispenser(adapter, op, typeAndTarget.targetFunction);
case search -> new MilvusSearchOpDispenser(adapter, op, typeAndTarget.targetFunction);
default -> throw new RuntimeException("Unrecognized op type '" + typeAndTarget.enumId.name() + "' while " +
"mapping parsed op " + op);
};

View File

@ -19,12 +19,9 @@ package io.nosqlbench.adapter.milvus.opdispensers;
import com.alibaba.fastjson.JSONObject;
import com.google.gson.JsonObject;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.dml.InsertParam;
import io.milvus.param.index.DropIndexParam;
import io.nosqlbench.adapter.milvus.MilvusDriverAdapter;
import io.nosqlbench.adapter.milvus.ops.MilvusDropIndexOp;
import io.nosqlbench.adapter.milvus.ops.MilvusInsertOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
@ -41,12 +38,9 @@ public class MilvusInsertOpDispenser extends MilvusOpDispenser {
/**
* Create a new MilvusDeleteOpDispenser subclassed from {@link MilvusOpDispenser}.
*
* @param adapter
* The associated {@link MilvusDriverAdapter}
* @param op
* The {@link ParsedOp} encapsulating the activity for this cycle
* @param targetFunction
* A LongFunction that returns the specified Milvus Index for this Op
* @param adapter The associated {@link MilvusDriverAdapter}
* @param op The {@link ParsedOp} encapsulating the activity for this cycle
* @param targetFunction A LongFunction that returns the specified Milvus Index for this Op
*/
public MilvusInsertOpDispenser(MilvusDriverAdapter adapter,
ParsedOp op,
@ -80,12 +74,12 @@ public class MilvusInsertOpDispenser extends MilvusOpDispenser {
}
private LongFunction<List<InsertParam.Field>> createFieldsF(ParsedOp op) {
LongFunction<Map> fielddata = op.getAsRequiredFunction("fields", Map.class);
LongFunction<Map> fieldDataF = op.getAsRequiredFunction("fields", Map.class);
LongFunction<List<InsertParam.Field>> fieldsF = l -> {
Map<String,Object> fieldmap = fielddata.apply(l);
Map<String, Object> fieldmap = fieldDataF.apply(l);
List<InsertParam.Field> fields = new ArrayList<>();
fieldmap.forEach((name,value)->{
fields.add(new InsertParam.Field(name,(List)value));
fieldmap.forEach((name, value) -> {
fields.add(new InsertParam.Field(name, (List) value));
});
return fields;
};

View File

@ -0,0 +1,152 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.milvus.opdispensers;
import io.milvus.client.MilvusServiceClient;
import io.milvus.common.clientenum.ConsistencyLevelEnum;
import io.milvus.param.MetricType;
import io.milvus.param.dml.SearchParam;
import io.nosqlbench.adapter.milvus.MilvusDriverAdapter;
import io.nosqlbench.adapter.milvus.ops.MilvusSearchOp;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.LongFunction;
public class MilvusSearchOpDispenser extends MilvusOpDispenser {
private static final Logger logger = LogManager.getLogger(MilvusSearchOpDispenser.class);
/**
* Create a new {@ link MilvusSearchOpDispenser} subclassed from {@link MilvusOpDispenser}.
*
* @param adapter The associated {@link MilvusDriverAdapter}
* @param op The {@link ParsedOp} encapsulating the activity for this cycle
* @param targetFunction A LongFunction that returns the specified Milvus Index for this Op
*/
public MilvusSearchOpDispenser(
MilvusDriverAdapter adapter,
ParsedOp op,
LongFunction<String> targetFunction) {
super(adapter, op, targetFunction);
}
@Override
public LongFunction<MilvusSearchOp> createOpFunc(
LongFunction<MilvusServiceClient> clientF,
ParsedOp op,
LongFunction<String> targetF) {
LongFunction<SearchParam.Builder> f =
l -> SearchParam.newBuilder().withCollectionName(targetF.apply(l));
f = op.enhanceFuncOptionally(f, "partition_name", String.class, SearchParam.Builder::addPartitionName);
if(op.isDefined("partition_names", List.class)) {
LongFunction<List<String>> partitionNamesF = createPartitionNamesF(op);
LongFunction<SearchParam.Builder> finalF = f;
f = l -> finalF.apply(l).withPartitionNames(partitionNamesF.apply(l));
f = op.enhanceFuncOptionally(f, "partition_names", List.class, SearchParam.Builder::withPartitionNames);
}
f = op.enhanceFuncOptionally(f, "out_field", String.class, SearchParam.Builder::addOutField);
if(op.isDefined("out_fields", List.class)) {
LongFunction<List<String>> outFieldsF = createOutFieldsF(op);
LongFunction<SearchParam.Builder> finalF1 = f;
f = l -> finalF1.apply(l).withOutFields(outFieldsF.apply(l));
}
f = op.enhanceEnumOptionally(f, "consistency_level", ConsistencyLevelEnum.class, SearchParam.Builder::withConsistencyLevel);
f = op.enhanceFuncOptionally(f, "expr", String.class, SearchParam.Builder::withExpr);
f = op.enhanceDefaultFunc(f, "top_k", Integer.class, 100, SearchParam.Builder::withTopK);
f = op.enhanceDefaultFunc(f, "metric_type", MetricType.class, MetricType.COSINE, SearchParam.Builder::withMetricType);
f = op.enhanceFuncOptionally(f, "round_decimal", Integer.class, SearchParam.Builder::withRoundDecimal);
f = op.enhanceFuncOptionally(f, "ignore_growing", Boolean.class, SearchParam.Builder::withIgnoreGrowing);
f = op.enhanceFuncOptionally(f, "params", String.class, SearchParam.Builder::withParams);
f = op.enhanceFunc(f, "vector_field_name", String.class, SearchParam.Builder::withVectorFieldName);
LongFunction<List<?>> queryVectorsF = createQueryVectorsF(op);
LongFunction<SearchParam.Builder> finalF2 = f;
f = l -> finalF2.apply(l).withVectors(queryVectorsF.apply(l));
LongFunction<SearchParam.Builder> searchParamsF = f;
LongFunction<MilvusSearchOp> searchOpF = l -> new MilvusSearchOp(clientF.apply(l), searchParamsF.apply(l).build());
return searchOpF;
}
/**
* Prepare the {@code query_vectors} parameter list for the search operation.
* @param op {@link ParsedOp}
* @return {@link LongFunction<List<?>>}
*/
private LongFunction<List<?>> createQueryVectorsF(ParsedOp op) {
LongFunction<Map> outVectorF = op.getAsRequiredFunction("vectors", Map.class);
LongFunction<List<?>> outFieldsListF = l -> {
Map<String, Object> fieldmap = outVectorF.apply(l);
//List<?> floatVectorList = new ArrayList<Float>();
//List<?> byteBufferVectorList = new ArrayList<>();
List<Object> finalVectorList = new ArrayList<>();
fieldmap.forEach((name, value) -> {
// TODO - validate if we really need to do these type checking here or let the DB barf at us if we
// use it otherwise https://milvus.io/api-reference/java/v2.3.x/Query%20and%20Search/search().md
// if(value instanceof Float) {
// floatVectorList.add((Float) value);
// } else {
// byteBufferVectorList.add((ByteBuffer) value);
// }
finalVectorList.add(value);
});
return finalVectorList;
};
return outFieldsListF;
}
/**
* Prepare the {@code out_fields} parameter list for the search operation.
* @param op {@link ParsedOp}
* @return {@link LongFunction<List<String>>}
*/
private LongFunction<List<String>> createOutFieldsF(ParsedOp op) {
LongFunction<Map> outFieldDataF = op.getAsRequiredFunction("out_fields", Map.class);
LongFunction<List<String>> outFieldsListF = l -> {
Map<String, Object> fieldmap = outFieldDataF.apply(l);
List<String> fields = new ArrayList<>();
fieldmap.forEach((name, value) -> {
fields.add(String.valueOf(value));
});
return fields;
};
return outFieldsListF;
}
/**
* Prepare the {@code partition_names} parameter list for the search operation.
* @param op {@link ParsedOp}
* @return {@link LongFunction<List<String>>}
*/
private LongFunction<List<String>> createPartitionNamesF(ParsedOp op) {
LongFunction<Map> partitionNamesDataF = op.getAsRequiredFunction("partition_names", Map.class);
LongFunction<List<String>> partitionNamesListF = l -> {
Map<String, Object> fieldmap = partitionNamesDataF.apply(l);
List<String> fields = new ArrayList<>();
fieldmap.forEach((name, value) -> {
fields.add(String.valueOf(value));
});
return fields;
};
return partitionNamesListF;
}
}

View File

@ -19,7 +19,6 @@ package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.param.R;
import io.milvus.param.RpcStatus;
import io.milvus.param.collection.CreateCollectionParam;
import io.milvus.param.index.CreateIndexParam;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
@ -29,6 +28,12 @@ public class MilvusCreateIndexOp extends MilvusOp {
private static final Logger logger = LogManager.getLogger(MilvusCreateIndexOp.class);
private final CreateIndexParam request;
/**
* Create a new {@link ParsedOp} encapsulating a call to the Milvus/Zilliz client delete method
*
* @param client The associated {@link MilvusServiceClient} used to communicate with the database
* @param request The {@link CreateIndexParam} built for this operation
*/
public MilvusCreateIndexOp(MilvusServiceClient client, CreateIndexParam request) {
super(client);
this.request = request;

View File

@ -17,8 +17,6 @@
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.DropIndexRequest;
import io.milvus.param.collection.DropCollectionParam;
import io.milvus.param.index.DropIndexParam;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
@ -32,7 +30,7 @@ public class MilvusDropIndexOp extends MilvusOp {
* Create a new {@link ParsedOp} encapsulating a call to the Milvus/Zilliz client delete method
*
* @param client The associated {@link MilvusServiceClient} used to communicate with the database
* @param request The {@link DropCollectionParam} built for this operation
* @param request The {@link DropIndexParam} built for this operation
*/
public MilvusDropIndexOp(MilvusServiceClient client, DropIndexParam request) {
super(client);

View File

@ -19,8 +19,6 @@ package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.MutationResult;
import io.milvus.param.R;
import io.milvus.param.RpcStatus;
import io.milvus.param.collection.CreateCollectionParam;
import io.milvus.param.dml.InsertParam;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
@ -34,7 +32,7 @@ public class MilvusInsertOp extends MilvusOp {
* Create a new {@link ParsedOp} encapsulating a call to the Milvus/Zilliz client delete method
*
* @param client The associated {@link MilvusServiceClient} used to communicate with the database
* @param request The {@link CreateCollectionParam} built for this operation
* @param request The {@link InsertParam} built for this operation
*/
public MilvusInsertOp(MilvusServiceClient client, InsertParam request) {
super(client);
@ -43,7 +41,7 @@ public class MilvusInsertOp extends MilvusOp {
@Override
public R<MutationResult> applyOp(long value) {
logger.debug("Milvus/Zilliz create collection request");
logger.debug("Milvus/Zilliz insert data request");
R<MutationResult> response = client.insert(request);
return response;
}

View File

@ -22,10 +22,6 @@ public enum MilvusOpTypes {
create_index,
drop_index,
insert
// update,
// upsert,
// delete,
// describeindexstats,
// fetch
insert,
search,
}

View File

@ -0,0 +1,44 @@
/*
* Copyright (c) 2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.milvus.ops;
import io.milvus.client.MilvusServiceClient;
import io.milvus.grpc.SearchResults;
import io.milvus.param.R;
import io.milvus.param.dml.SearchParam;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class MilvusSearchOp extends MilvusOp<SearchParam> {
/**
* Create a new {@link ParsedOp} encapsulating a call to the Milvus/Zilliz client delete method
*
* @param client The associated {@link MilvusServiceClient} used to communicate with the database
* @param request The {@link SearchParam} built for this operation
*/
public MilvusSearchOp(MilvusServiceClient client, SearchParam request) {
super(client, request);
}
@Override
public R<SearchResults> applyOp(long value) {
R<SearchResults> response = client.search(request);
return response;
}
}

View File

@ -64,19 +64,21 @@ blocks:
insert_op:
insert:
collection_name: "TEMPLATE(collection,vector)"
field:
fields:
key: {row_key}
value: {row_vector}
search:
ops:
# https://milvus.io/api-reference/java/v2.3.x/High-level%20API/search().md
# https://milvus.io/api-reference/java/v2.3.x/Query%20and%20Search/search().md
search_op:
search:
collection_name: "TEMPLATE(collection,vector)"
output_fields:
- key
- value
vector_field_name: "value"
vectors: {row_vector}
limit: TEMPLATE(top_k,100)
top_k: TEMPLATE(top_k,100)
consistency_level: "TEMPLATE(read_cl,EVENTUALLY)"