mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Create collection fully working
This commit is contained in:
@@ -16,9 +16,7 @@
|
||||
|
||||
package io.nosqlbench.adapter.qdrant;
|
||||
|
||||
import io.nosqlbench.adapter.qdrant.opdispensers.QdrantBaseOpDispenser;
|
||||
import io.nosqlbench.adapter.qdrant.opdispensers.QdrantCreateCollectionOpDispenser;
|
||||
import io.nosqlbench.adapter.qdrant.opdispensers.QdrantDeleteCollectionOpDispenser;
|
||||
import io.nosqlbench.adapter.qdrant.opdispensers.*;
|
||||
import io.nosqlbench.adapter.qdrant.ops.QdrantBaseOp;
|
||||
import io.nosqlbench.adapter.qdrant.types.QdrantOpType;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
@@ -60,6 +58,9 @@ public class QdrantOpMapper implements OpMapper<QdrantBaseOp<?>> {
|
||||
return switch (typeAndTarget.enumId) {
|
||||
case delete_collection -> new QdrantDeleteCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction);
|
||||
case create_collection -> new QdrantCreateCollectionOpDispenser(adapter, op, typeAndTarget.targetFunction);
|
||||
case create_payload_index ->
|
||||
new QdrantCreatePayloadIndexOpDispenser(adapter, op, typeAndTarget.targetFunction);
|
||||
case search_points -> new QdrantSearchPointsOpDispenser(adapter, op, typeAndTarget.targetFunction);
|
||||
// default -> throw new RuntimeException("Unrecognized op type '" + typeAndTarget.enumId.name() + "' while " +
|
||||
// "mapping parsed op " + op);
|
||||
};
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
/*
|
||||
* Copyright (c) 2020-2020-2024 nosqlbench
|
||||
* Copyright (c) 2020-2024 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
@@ -21,12 +21,11 @@ import io.nosqlbench.adapter.qdrant.ops.QdrantBaseOp;
|
||||
import io.nosqlbench.adapter.qdrant.ops.QdrantCreateCollectionOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.qdrant.client.QdrantClient;
|
||||
import io.qdrant.client.grpc.Collections.CreateCollection;
|
||||
import io.qdrant.client.grpc.Collections.Distance;
|
||||
import io.qdrant.client.grpc.Collections.VectorParams;
|
||||
import io.qdrant.client.grpc.Collections.*;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.function.LongFunction;
|
||||
@@ -57,51 +56,336 @@ public class QdrantCreateCollectionOpDispenser extends QdrantBaseOpDispenser<Cre
|
||||
LongFunction<CreateCollection.Builder> ebF =
|
||||
l -> CreateCollection.newBuilder().setCollectionName(targetF.apply(l));
|
||||
|
||||
// LongFunction<VectorParams.Builder> ebF =
|
||||
// l -> CreateCollectionParam.newBuilder().withCollectionName(targetF.apply(l));
|
||||
// VectorParams namedVectorsMap = buildNamedVectorsStruct(
|
||||
// op.getAsSubOps("vectors", ParsedOp.SubOpNaming.SubKey)
|
||||
// );
|
||||
// final LongFunction<CreateCollection.Builder> namedVectorsF = ebF;
|
||||
// ebF = l -> namedVectorsF.apply(l).setVectorsConfig(VectorsConfig.newBuilder().setParams(namedVectorsMap));
|
||||
|
||||
Map<String, VectorParams> namedVectorsMap = buildNamedVectorsStruct(
|
||||
Map<String, VectorParams> namedVectorParamsMap = buildNamedVectorsStruct(
|
||||
op.getAsSubOps("vectors", ParsedOp.SubOpNaming.SubKey)
|
||||
);
|
||||
final LongFunction<CreateCollection.Builder> namedVectorsF = ebF;
|
||||
ebF = l -> namedVectorsF.apply(l).setVectorsConfig(VectorsConfig.newBuilder().setParamsMap(
|
||||
VectorParamsMap.newBuilder().putAllMap(namedVectorParamsMap).build()));
|
||||
|
||||
// ebF = op.enhanceFuncOptionally(ebF, "shards_num", Number.class,
|
||||
// (VectorParams.Builder b, Number n) -> b.withShardsNum(n.intValue()));
|
||||
// ebF = op.enhanceFuncOptionally(ebF, "partition_num", Number.class,
|
||||
// (CreateCollectionParam.Builder b, Number n) -> b.withPartitionsNum(n.intValue()));
|
||||
// ebF = op.enhanceFuncOptionally(ebF, "description", String.class,
|
||||
// VectorParams.Builder::withDescription);
|
||||
// ebF = op.enhanceEnumOptionally(ebF, "consistency_level",
|
||||
// ConsistencyLevelEnum.class, CreateCollectionParam.Builder::withConsistencyLevel);
|
||||
// ebF = op.enhanceFuncOptionally(ebF, "database_name", String.class,
|
||||
// CreateCollectionParam.Builder::withDatabaseName);
|
||||
ebF = op.enhanceFuncOptionally(ebF, "on_disk_payload", Boolean.class,
|
||||
CreateCollection.Builder::setOnDiskPayload);
|
||||
ebF = op.enhanceFuncOptionally(ebF, "shard_number", Number.class,
|
||||
(CreateCollection.Builder b, Number n) -> b.setShardNumber(n.intValue()));
|
||||
ebF = op.enhanceFuncOptionally(ebF, "replication_factor", Number.class,
|
||||
(CreateCollection.Builder b, Number n) -> b.setReplicationFactor(n.intValue()));
|
||||
ebF = op.enhanceFuncOptionally(ebF, "write_consistency_factor", Number.class,
|
||||
(CreateCollection.Builder b, Number n) -> b.setWriteConsistencyFactor(n.intValue()));
|
||||
ebF = op.enhanceFuncOptionally(ebF, "init_from", String.class,
|
||||
CreateCollection.Builder::setInitFromCollection);
|
||||
ebF = op.enhanceFuncOptionally(ebF, "sharding_method", String.class,
|
||||
(CreateCollection.Builder b, String s) -> b.setShardingMethod(ShardingMethod.valueOf(s)));
|
||||
|
||||
// List<FieldType> fieldTypes = buildFieldTypesStruct(
|
||||
// op.getAsSubOps("field_types", ParsedOp.SubOpNaming.SubKey)
|
||||
// );
|
||||
// TODO - HERE
|
||||
// final LongFunction<VectorParams.Builder> f = ebF;
|
||||
// ebF = l -> f.apply(l).withSchema(CollectionSchemaParam.newBuilder().withFieldTypes(fieldTypes).build());
|
||||
//
|
||||
// final LongFunction<VectorParams.Builder> lastF = ebF;
|
||||
// return l -> lastF.apply(l).build();
|
||||
return l -> ebF.apply(l).build();
|
||||
WalConfigDiff walConfig = buildWalConfigDiff(op);
|
||||
final LongFunction<CreateCollection.Builder> walConfigF = ebF;
|
||||
ebF = l -> walConfigF.apply(l).setWalConfig(walConfig);
|
||||
|
||||
OptimizersConfigDiff ocDiff = buildOptimizerConfigDiff(op);
|
||||
final LongFunction<CreateCollection.Builder> ocF = ebF;
|
||||
ebF = l -> ocF.apply(l).setOptimizersConfig(ocDiff);
|
||||
|
||||
HnswConfigDiff hnswConfigDiff = buildHnswConfigDiff(op);
|
||||
final LongFunction<CreateCollection.Builder> hnswConfigF = ebF;
|
||||
ebF = l -> hnswConfigF.apply(l).setHnswConfig(hnswConfigDiff);
|
||||
|
||||
QuantizationConfig qcDiff = buildQuantizationConfig(op);
|
||||
if (qcDiff != null) {
|
||||
final LongFunction<CreateCollection.Builder> qcConfigF = ebF;
|
||||
ebF = l -> qcConfigF.apply(l).setQuantizationConfig(qcDiff);
|
||||
}
|
||||
|
||||
SparseVectorConfig sparseVectorsMap = buildSparseVectorsStruct(
|
||||
op.getAsSubOps("sparse_vectors", ParsedOp.SubOpNaming.SubKey)
|
||||
);
|
||||
final LongFunction<CreateCollection.Builder> sparseVectorsF = ebF;
|
||||
ebF = l -> sparseVectorsF.apply(l).setSparseVectorsConfig(sparseVectorsMap);
|
||||
|
||||
final LongFunction<CreateCollection.Builder> lastF = ebF;
|
||||
return l -> lastF.apply(l).build();
|
||||
}
|
||||
|
||||
private Map<String, VectorParams> buildNamedVectorsStruct(Map<String, ParsedOp> namedVectorsData) {
|
||||
/**
|
||||
* Build the {@link OptimizersConfigDiff} from the provided {@link ParsedOp}.
|
||||
*
|
||||
* @param op {@link ParsedOp} containing the optimizer config data.
|
||||
* @return {@link OptimizersConfigDiff} containing the optimizer config data
|
||||
*/
|
||||
private OptimizersConfigDiff buildOptimizerConfigDiff(ParsedOp op) {
|
||||
OptimizersConfigDiff.Builder ocDiffBuilder = OptimizersConfigDiff.newBuilder();
|
||||
op.getOptionalStaticValue("optimizers_config", Map.class).ifPresent(ocData -> {
|
||||
if (ocData.isEmpty()) {
|
||||
return;
|
||||
} else {
|
||||
if (ocData.containsKey("deleted_threshold")) {
|
||||
ocDiffBuilder.setDeletedThreshold(((Number) ocData.get("deleted_threshold")).doubleValue());
|
||||
}
|
||||
if (ocData.containsKey("vacuum_min_vector_number")) {
|
||||
ocDiffBuilder.setVacuumMinVectorNumber(((Number) ocData.get("vacuum_min_vector_number")).longValue());
|
||||
}
|
||||
if (ocData.containsKey("default_segment_number")) {
|
||||
ocDiffBuilder.setDefaultSegmentNumber(((Number) ocData.get("default_segment_number")).longValue());
|
||||
}
|
||||
if (ocData.containsKey("max_segment_size")) {
|
||||
ocDiffBuilder.setMaxSegmentSize(((Number) ocData.get("max_segment_size")).longValue());
|
||||
}
|
||||
if (ocData.containsKey("memmap_threshold")) {
|
||||
ocDiffBuilder.setMemmapThreshold(((Number) ocData.get("memmap_threshold")).longValue());
|
||||
}
|
||||
if (ocData.containsKey("indexing_threshold")) {
|
||||
ocDiffBuilder.setIndexingThreshold(((Number) ocData.get("indexing_threshold")).longValue());
|
||||
}
|
||||
if (ocData.containsKey(("flush_interval_sec"))) {
|
||||
ocDiffBuilder.setFlushIntervalSec(((Number) ocData.get("flush_interval_sec")).longValue());
|
||||
}
|
||||
if (ocData.containsKey("max_optimization_threads")) {
|
||||
ocDiffBuilder.setMaxOptimizationThreads(((Number) ocData.get("max_optimization_threads")).intValue());
|
||||
}
|
||||
}
|
||||
});
|
||||
return ocDiffBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the {@link WalConfigDiff} from the provided {@link ParsedOp}.
|
||||
*
|
||||
* @param op {@link ParsedOp} containing the WAL config data.
|
||||
* @return {@link WalConfigDiff} containing the WAL config data
|
||||
*/
|
||||
private WalConfigDiff buildWalConfigDiff(ParsedOp op) {
|
||||
WalConfigDiff.Builder walConfigDiffBuilder = WalConfigDiff.newBuilder();
|
||||
op.getOptionalStaticValue("wal_config", Map.class).ifPresent(walConfigData -> {
|
||||
if (walConfigData.isEmpty()) {
|
||||
return;
|
||||
} else {
|
||||
if (walConfigData.containsKey("wal_capacity_mb")) {
|
||||
walConfigDiffBuilder.setWalCapacityMb(((Number) walConfigData.get("wal_capacity_mb")).longValue());
|
||||
}
|
||||
if (walConfigData.containsKey("wal_segments_ahead")) {
|
||||
walConfigDiffBuilder.setWalSegmentsAhead(((Number) walConfigData.get("wal_segments_ahead")).longValue());
|
||||
}
|
||||
}
|
||||
});
|
||||
return walConfigDiffBuilder.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Only named vectors are supported at this time in this driver.
|
||||
*
|
||||
* @param {@link Map<String, ParsedOp>} namedVectorsData
|
||||
* @return {@link VectorParams} containing the named vectors
|
||||
*/
|
||||
private Map<String, VectorParams>/*VectorParams*/ buildNamedVectorsStruct(Map<String, ParsedOp> namedVectorsData) {
|
||||
// if (namedVectorsData.size() != 1) {
|
||||
// // TODO - we need this form somehow to support the mapped version
|
||||
// // https://github.com/qdrant/java-client/blob/v1.9.0/src/main/java/io/qdrant/client/QdrantClient.java#L232-L243
|
||||
// throw new UnsupportedOperationException("Empty or more than one named vectors are not supported at this time");
|
||||
// }
|
||||
Map<String, VectorParams> namedVectors = new HashMap<>();
|
||||
namedVectorsData.forEach((name, fieldspec) -> {
|
||||
VectorParams.Builder builder = VectorParams.newBuilder();
|
||||
// TODO - these are mandatory items; see how to achieve this.
|
||||
fieldspec.getOptionalStaticConfig("distance", Distance.class)
|
||||
.ifPresent(builder::setDistance);
|
||||
fieldspec.getOptionalStaticConfig("size", Number.class)
|
||||
.ifPresent((Number n) -> builder.setSize(n.intValue()));
|
||||
VectorParams.Builder builder = VectorParams.newBuilder();
|
||||
namedVectorsData.forEach((name, fieldSpec) -> {
|
||||
builder.setDistanceValue(fieldSpec.getStaticValue("distance_value", Number.class).intValue());
|
||||
builder.setSize(fieldSpec.getStaticValue("size", Number.class).longValue());
|
||||
fieldSpec.getOptionalStaticValue("on_disk", Boolean.class)
|
||||
.ifPresent(builder::setOnDisk);
|
||||
fieldSpec.getOptionalStaticValue("datatype_value", Number.class)
|
||||
.ifPresent((Number value) -> builder.setDatatypeValue(value.intValue()));
|
||||
|
||||
builder.setHnswConfig(buildHnswConfigDiff(fieldSpec));
|
||||
builder.setQuantizationConfig(buildQuantizationConfig(fieldSpec));
|
||||
|
||||
namedVectors.put(name, builder.build());
|
||||
});
|
||||
//return builder.build();
|
||||
return namedVectors;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the {@link QuantizationConfig} from the provided {@link ParsedOp}.
|
||||
*
|
||||
* @param fieldSpec The {@link ParsedOp} containing the quantization config data
|
||||
* @return The {@link QuantizationConfig} built from the provided {@link ParsedOp}
|
||||
* @see <a href="https://qdrant.tech/documentation/guides/quantization/#setting-up-quantization-in-qdrant">Quantization Config</a>
|
||||
*/
|
||||
private QuantizationConfig buildQuantizationConfig(ParsedOp fieldSpec) {
|
||||
QuantizationConfig.Builder qcBuilder = QuantizationConfig.newBuilder();
|
||||
fieldSpec.getOptionalStaticValue("quantization_config", Map.class).ifPresent(qcData -> {
|
||||
if (qcData.isEmpty()) {
|
||||
return;
|
||||
} else {
|
||||
// TODO - Approach #1 - feels ugly
|
||||
Arrays.asList("binary", "product", "scalar")
|
||||
.forEach(key -> {
|
||||
if (qcData.containsKey(key)) {
|
||||
switch (key) {
|
||||
case "binary":
|
||||
BinaryQuantization.Builder binaryBuilder = BinaryQuantization.newBuilder();
|
||||
Map<?, ?> binaryQCData = (Map<?, ?>) qcData.get("binary");
|
||||
if (null != binaryQCData && !binaryQCData.isEmpty()) {
|
||||
if (binaryQCData.containsKey("always_ram")) {
|
||||
binaryBuilder.setAlwaysRam((Boolean) binaryQCData.get("always_ram"));
|
||||
}
|
||||
}
|
||||
qcBuilder.setBinary(binaryBuilder);
|
||||
break;
|
||||
case "product":
|
||||
ProductQuantization.Builder productBuilder = ProductQuantization.newBuilder();
|
||||
Map<?, ?> productQCData = (Map<?, ?>) qcData.get("product");
|
||||
if (null != productQCData && !productQCData.isEmpty()) {
|
||||
// Mandatory field
|
||||
productBuilder.setAlwaysRam((Boolean) productQCData.get("always_ram"));
|
||||
// Optional field(s) below
|
||||
if (productQCData.containsKey("compression")) {
|
||||
productBuilder.setCompression(CompressionRatio.valueOf((String) productQCData.get("compression")));
|
||||
}
|
||||
}
|
||||
qcBuilder.setProduct(productBuilder);
|
||||
break;
|
||||
case "scalar":
|
||||
ScalarQuantization.Builder scalarBuilder = ScalarQuantization.newBuilder();
|
||||
Map<?, ?> scalarQCData = (Map<?, ?>) qcData.get("scalar");
|
||||
if (null != scalarQCData && !scalarQCData.isEmpty()) {
|
||||
// Mandatory field
|
||||
scalarBuilder.setType(QuantizationType.forNumber(((Number) scalarQCData.get("type")).intValue()));
|
||||
// Optional field(s) below
|
||||
if (scalarQCData.containsKey("always_ram")) {
|
||||
scalarBuilder.setAlwaysRam((Boolean) scalarQCData.get("always_ram"));
|
||||
}
|
||||
if (scalarQCData.containsKey("quantile")) {
|
||||
scalarBuilder.setQuantile(((Number) scalarQCData.get("quantile")).floatValue());
|
||||
}
|
||||
}
|
||||
qcBuilder.setScalar(scalarBuilder);
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
// TODO - Approach #2 - equally feels ugly too.
|
||||
// if (qcData.containsKey("binary")) {
|
||||
// if (qcData.containsKey("scalar") || qcData.containsKey("product")) {
|
||||
// throw new UnsupportedOperationException("Only one of binary, scalar, or product can be specified for quantization config");
|
||||
// }
|
||||
// BinaryQuantization.Builder binaryBuilder = BinaryQuantization.newBuilder();
|
||||
// Map<?, ?> binaryQCData = (Map<?, ?>) qcData.get("binary");
|
||||
// if (null != binaryQCData && !binaryQCData.isEmpty()) {
|
||||
// if (binaryQCData.containsKey("always_ram")) {
|
||||
// binaryBuilder.setAlwaysRam((Boolean) binaryQCData.get("always_ram"));
|
||||
// }
|
||||
// }
|
||||
// qcBuilder.setBinary(binaryBuilder);
|
||||
// } else if (qcData.containsKey("product")) {
|
||||
// if (qcData.containsKey("binary") || qcData.containsKey("scalar")) {
|
||||
// throw new UnsupportedOperationException("Only one of binary, scalar, or product can be specified for quantization config");
|
||||
// }
|
||||
// ProductQuantization.Builder productBuilder = ProductQuantization.newBuilder();
|
||||
// Map<?, ?> productQCData = (Map<?, ?>) qcData.get("product");
|
||||
// if (null != productQCData && !productQCData.isEmpty()) {
|
||||
// // Mandatory field
|
||||
// productBuilder.setAlwaysRam((Boolean) productQCData.get("always_ram"));
|
||||
// // Optional field(s) below
|
||||
// if (productQCData.containsKey("compression")) {
|
||||
// productBuilder.setCompression(CompressionRatio.valueOf((String) productQCData.get("compression")));
|
||||
// }
|
||||
// }
|
||||
// qcBuilder.setProduct(productBuilder);
|
||||
// } else if (qcData.containsKey("scalar")) {
|
||||
// if (qcData.containsKey("binary") || qcData.containsKey("product")) {
|
||||
// throw new UnsupportedOperationException("Only one of binary, scalar, or product can be specified for quantization config");
|
||||
// }
|
||||
// ScalarQuantization.Builder scalarBuilder = ScalarQuantization.newBuilder();
|
||||
// Map<?, ?> scalarQCData = (Map<?, ?>) qcData.get("scalar");
|
||||
// if (null != scalarQCData && !scalarQCData.isEmpty()) {
|
||||
// // Mandatory field
|
||||
// scalarBuilder.setType(QuantizationType.valueOf((String) scalarQCData.get("type")));
|
||||
// // Optional field(s) below
|
||||
// if (scalarQCData.containsKey("always_ram")) {
|
||||
// scalarBuilder.setAlwaysRam((Boolean) scalarQCData.get("always_ram"));
|
||||
// }
|
||||
// if (scalarQCData.containsKey("quantile")) {
|
||||
// scalarBuilder.setQuantile((Float) scalarQCData.get("quantile"));
|
||||
// }
|
||||
// }
|
||||
// qcBuilder.setScalar(scalarBuilder);
|
||||
// }
|
||||
}
|
||||
});
|
||||
|
||||
// The below check is required to avoid INVALID_ARGUMENT: Unable to convert quantization config
|
||||
if (qcBuilder.hasBinary() || qcBuilder.hasProduct() || qcBuilder.hasScalar()) {
|
||||
return qcBuilder.build();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the {@link HnswConfigDiff} from the provided {@link ParsedOp}.
|
||||
*
|
||||
* @param fieldSpec The {@link ParsedOp} containing the hnsw config data
|
||||
* @return The {@link HnswConfigDiff} built from the provided {@link ParsedOp}
|
||||
* @see <a href="https://qdrant.tech/documentation/concepts/indexing/#vector-index">HNSW Config</a>
|
||||
*/
|
||||
private HnswConfigDiff buildHnswConfigDiff(ParsedOp fieldSpec) {
|
||||
HnswConfigDiff.Builder hnswConfigBuilder = HnswConfigDiff.newBuilder();
|
||||
fieldSpec.getOptionalStaticValue("hnsw_config", Map.class).ifPresent(hnswConfigData -> {
|
||||
if (hnswConfigData.isEmpty()) {
|
||||
return;
|
||||
} else {
|
||||
if (hnswConfigData.containsKey("ef_construct")) {
|
||||
hnswConfigBuilder.setEfConstruct(((Number) hnswConfigData.get("ef_construct")).longValue());
|
||||
}
|
||||
if (hnswConfigData.containsKey("m")) {
|
||||
hnswConfigBuilder.setM(((Number) hnswConfigData.get("m")).intValue());
|
||||
}
|
||||
if (hnswConfigData.containsKey("full_scan_threshold")) {
|
||||
hnswConfigBuilder.setFullScanThreshold(((Number) hnswConfigData.get("full_scan_threshold")).intValue());
|
||||
}
|
||||
if (hnswConfigData.containsKey("max_indexing_threads")) {
|
||||
hnswConfigBuilder.setMaxIndexingThreads(((Number) hnswConfigData.get("max_indexing_threads")).intValue());
|
||||
}
|
||||
if (hnswConfigData.containsKey("on_disk")) {
|
||||
hnswConfigBuilder.setOnDisk((Boolean) hnswConfigData.get("on_disk"));
|
||||
}
|
||||
if (hnswConfigData.containsKey("payload_m")) {
|
||||
hnswConfigBuilder.setPayloadM(((Number) hnswConfigData.get("payload_m")).intValue());
|
||||
}
|
||||
}
|
||||
});
|
||||
// if (hnswConfigBuilder.hasM() || hnswConfigBuilder.hasEfConstruct() || hnswConfigBuilder.hasOnDisk()
|
||||
// || hnswConfigBuilder.hasPayloadM() || hnswConfigBuilder.hasFullScanThreshold()
|
||||
// || hnswConfigBuilder.hasMaxIndexingThreads()) {
|
||||
return hnswConfigBuilder.build();
|
||||
// }
|
||||
// return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the {@link SparseVectorConfig} from the provided {@link ParsedOp}.
|
||||
*
|
||||
* @param sparseVectorsData The {@link ParsedOp} containing the sparse vectors data
|
||||
* @return The {@link SparseVectorConfig} built from the provided {@link ParsedOp}
|
||||
*/
|
||||
private SparseVectorConfig buildSparseVectorsStruct(Map<String, ParsedOp> sparseVectorsData) {
|
||||
SparseVectorConfig.Builder builder = SparseVectorConfig.newBuilder();
|
||||
sparseVectorsData.forEach((name, fieldSpec) -> {
|
||||
SparseVectorParams.Builder svpBuilder = SparseVectorParams.newBuilder();
|
||||
SparseIndexConfig.Builder sicBuilder = SparseIndexConfig.newBuilder();
|
||||
|
||||
fieldSpec.getOptionalStaticValue("full_scan_threshold", Number.class)
|
||||
.ifPresent((Number value) -> sicBuilder.setFullScanThreshold(value.intValue()));
|
||||
fieldSpec.getOptionalStaticValue("on_disk", Boolean.class)
|
||||
.ifPresent(sicBuilder::setOnDisk);
|
||||
|
||||
svpBuilder.setIndex(sicBuilder);
|
||||
builder.putMap(name, svpBuilder.build());
|
||||
});
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
// https://qdrant.tech/documentation/concepts/collections/#create-a-collection
|
||||
@Override
|
||||
public LongFunction<QdrantBaseOp<CreateCollection>> createOpFunc(
|
||||
@@ -112,42 +396,4 @@ public class QdrantCreateCollectionOpDispenser extends QdrantBaseOpDispenser<Cre
|
||||
) {
|
||||
return l -> new QdrantCreateCollectionOp(clientF.apply(l), paramF.apply(l));
|
||||
}
|
||||
|
||||
/**
|
||||
* Function to build the {@link FieldType}s for the {@link VectorParams}.
|
||||
*
|
||||
* @param fieldTypesData The static map of config data from the create collection request
|
||||
* @return a list of static field types
|
||||
*/
|
||||
// private List<FieldType> buildFieldTypesStruct(Map<String, ParsedOp> fieldTypesData) {
|
||||
// List<FieldType> fieldTypes = new ArrayList<>();
|
||||
// fieldTypesData.forEach((name, fieldspec) -> {
|
||||
// FieldType.Builder builder = FieldType.newBuilder()
|
||||
// .withName(name);
|
||||
//
|
||||
// fieldspec.getOptionalStaticValue("primary_key", Boolean.class)
|
||||
// .ifPresent(builder::withPrimaryKey);
|
||||
// fieldspec.getOptionalStaticValue("auto_id", Boolean.class)
|
||||
// .ifPresent(builder::withAutoID);
|
||||
// fieldspec.getOptionalStaticConfig("max_length", Number.class)
|
||||
// .ifPresent((Number n) -> builder.withMaxLength(n.intValue()));
|
||||
// fieldspec.getOptionalStaticConfig("max_capacity", Number.class)
|
||||
// .ifPresent((Number n) -> builder.withMaxCapacity(n.intValue()));
|
||||
// fieldspec.getOptionalStaticValue(List.of("partition_key", "partition"), Boolean.class)
|
||||
// .ifPresent(builder::withPartitionKey);
|
||||
// fieldspec.getOptionalStaticValue("dimension", Number.class)
|
||||
// .ifPresent((Number n) -> builder.withDimension(n.intValue()));
|
||||
// fieldspec.getOptionalStaticConfig("data_type", String.class)
|
||||
// .map(DataType::valueOf)
|
||||
// .ifPresent(builder::withDataType);
|
||||
// fieldspec.getOptionalStaticConfig("type_params", Map.class)
|
||||
// .ifPresent(builder::withTypeParams);
|
||||
// fieldspec.getOptionalStaticConfig("element_type", String.class)
|
||||
// .map(DataType::valueOf)
|
||||
// .ifPresent(builder::withElementType);
|
||||
//
|
||||
// fieldTypes.add(builder.build());
|
||||
// });
|
||||
// return fieldTypes;
|
||||
// }
|
||||
}
|
||||
|
||||
@@ -0,0 +1,54 @@
|
||||
/*
|
||||
* Copyright (c) 2024 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.qdrant.opdispensers;
|
||||
|
||||
import io.nosqlbench.adapter.qdrant.QdrantDriverAdapter;
|
||||
import io.nosqlbench.adapter.qdrant.ops.QdrantBaseOp;
|
||||
import io.nosqlbench.adapter.qdrant.ops.QdrantPayloadIndexOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.qdrant.client.QdrantClient;
|
||||
import io.qdrant.client.grpc.Collections.PayloadIndexParams;
|
||||
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class QdrantCreatePayloadIndexOpDispenser extends QdrantBaseOpDispenser<PayloadIndexParams> {
|
||||
public QdrantCreatePayloadIndexOpDispenser(
|
||||
QdrantDriverAdapter adapter,
|
||||
ParsedOp op,
|
||||
LongFunction<String> targetFunction) {
|
||||
super(adapter, op, targetFunction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongFunction<PayloadIndexParams> getParamFunc(
|
||||
LongFunction<QdrantClient> clientF,
|
||||
ParsedOp op,
|
||||
LongFunction<String> targetF) {
|
||||
LongFunction<PayloadIndexParams.Builder> ebF =
|
||||
l -> PayloadIndexParams.newBuilder().setField(null, targetF.apply(l));
|
||||
return l -> ebF.apply(l).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongFunction<QdrantBaseOp<PayloadIndexParams>> createOpFunc(
|
||||
LongFunction<PayloadIndexParams> paramF,
|
||||
LongFunction<QdrantClient> clientF,
|
||||
ParsedOp op,
|
||||
LongFunction<String> targetF) {
|
||||
return l -> new QdrantPayloadIndexOp(clientF.apply(l), paramF.apply(l));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,102 @@
|
||||
/*
|
||||
* Copyright (c) 2020-2024 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.qdrant.opdispensers;
|
||||
|
||||
import io.nosqlbench.adapter.qdrant.QdrantDriverAdapter;
|
||||
import io.nosqlbench.adapter.qdrant.ops.QdrantBaseOp;
|
||||
import io.nosqlbench.adapter.qdrant.ops.QdrantSearchPointsOp;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.qdrant.client.QdrantClient;
|
||||
import io.qdrant.client.grpc.Points.*;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class QdrantSearchPointsOpDispenser extends QdrantBaseOpDispenser<SearchPoints> {
|
||||
public QdrantSearchPointsOpDispenser(QdrantDriverAdapter adapter, ParsedOp op, LongFunction<String> targetFunction) {
|
||||
super(adapter, op, targetFunction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongFunction<QdrantBaseOp<SearchPoints>> createOpFunc(
|
||||
LongFunction<SearchPoints> paramF,
|
||||
LongFunction<QdrantClient> clientF,
|
||||
ParsedOp op, LongFunction<String> targetF) {
|
||||
return l -> new QdrantSearchPointsOp(clientF.apply(l), paramF.apply(l));
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongFunction<SearchPoints> getParamFunc(LongFunction<QdrantClient> clientF, ParsedOp op, LongFunction<String> targetF) {
|
||||
LongFunction<SearchPoints.Builder> ebF =
|
||||
l -> SearchPoints.newBuilder().setCollectionName(targetF.apply(l));
|
||||
|
||||
ebF = op.enhanceFuncOptionally(ebF, "limit", Number.class,
|
||||
(SearchPoints.Builder b, Number n) -> b.setLimit(n.longValue()));
|
||||
ebF = op.enhanceFuncOptionally(ebF, "vector_name", String.class, SearchPoints.Builder::setVectorName);
|
||||
ebF = op.enhanceFuncOptionally(ebF, "with_payload", Boolean.class,
|
||||
(SearchPoints.Builder b, Boolean wp) -> b.setWithPayload(WithPayloadSelector.newBuilder().setEnable(wp).build()));
|
||||
ebF = op.enhanceFuncOptionally(ebF, "with_vector", Boolean.class,
|
||||
(SearchPoints.Builder b, Boolean wp) -> b.setWithVectors(WithVectorsSelector.newBuilder().setEnable(wp).build()));
|
||||
ebF = op.enhanceFuncOptionally(ebF, "read_consistency", Number.class,
|
||||
(SearchPoints.Builder b, Number rc) -> b.setReadConsistency(
|
||||
ReadConsistency.newBuilder().setType(ReadConsistencyType.valueOf(rc.intValue())).build()));
|
||||
// ebF = op.enhanceFunc(ebF, List.of("vector_vector", "vectors"), List.class,
|
||||
// (SearchPoints.Builder b, List<Float> vec) -> b.addAllVector(vec));
|
||||
|
||||
Optional<LongFunction<List<Float>>> optionalVectorsF = getVectorFieldsFunction(op, "vector_vector");
|
||||
if(optionalVectorsF.isPresent()) {
|
||||
var rf = optionalVectorsF.get();
|
||||
LongFunction<SearchPoints.Builder> finalF2 = ebF;
|
||||
ebF = l -> finalF2.apply(l).addAllVector(rf.apply(l));
|
||||
}//ccvx .getAsSubOps("vectors", ParsedOp.SubOpNaming.SubKey)
|
||||
// );
|
||||
// final LongFunction<Collections.CreateCollection.Builder> namedVectorsF = ebF;
|
||||
// ebF = l -> namedVectorsF.apply(l).setVectorsConfig(Collections.VectorsConfig.newBuilder().setParams(namedVectorsMap));
|
||||
else {
|
||||
throw new OpConfigError("Must provide values for vectors");
|
||||
}
|
||||
|
||||
final LongFunction<SearchPoints.Builder> lastF = ebF;
|
||||
return l -> lastF.apply(l).build();
|
||||
}
|
||||
|
||||
private Optional<LongFunction<List<Float>>> getVectorFieldsFunction(ParsedOp op, String namedVectors) {
|
||||
// return l -> {
|
||||
// if (!op.isDefined(namedVectors)) {
|
||||
// return Optional.empty();
|
||||
// }
|
||||
// List<Float> fields = op.get(namedVectors, 0L);
|
||||
// if (fields == null) {
|
||||
// fields = op.get(namedVectors, 0L);
|
||||
// }
|
||||
// return fields;
|
||||
// };
|
||||
LongFunction<Object> rowF = op.getAsRequiredFunction(namedVectors, Object.class);
|
||||
Object testObject = rowF.apply(0L);
|
||||
LongFunction<List<Float>> rowsF = null;
|
||||
if(testObject instanceof List<?> list) {
|
||||
if (list.isEmpty()) {
|
||||
throw new OpConfigError("Unable to detect type of list object for empty list for op named '" + op.getName() + "'");
|
||||
} else if (list.get(0) instanceof Float) {
|
||||
rowsF = l -> (List<Float>) rowF.apply(l);
|
||||
}
|
||||
}
|
||||
return Optional.ofNullable(rowsF);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Copyright (c) 2020-2024 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.qdrant.ops;
|
||||
|
||||
import io.qdrant.client.QdrantClient;
|
||||
import io.qdrant.client.grpc.Collections.PayloadIndexParams;
|
||||
|
||||
public class QdrantPayloadIndexOp extends QdrantBaseOp<PayloadIndexParams> {
|
||||
public QdrantPayloadIndexOp(QdrantClient client, PayloadIndexParams request) {
|
||||
super(client, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object applyOp(long value) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Copyright (c) 2024 nosqlbench
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.nosqlbench.adapter.qdrant.ops;
|
||||
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import io.qdrant.client.QdrantClient;
|
||||
import io.qdrant.client.grpc.Points.ScoredPoint;
|
||||
import io.qdrant.client.grpc.Points.SearchPoints;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class QdrantSearchPointsOp extends QdrantBaseOp<SearchPoints> {
|
||||
public QdrantSearchPointsOp(QdrantClient client, SearchPoints request) {
|
||||
super(client, request);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object applyOp(long value) {
|
||||
ListenableFuture<List<ScoredPoint>> result =
|
||||
client.searchAsync(request);
|
||||
return result;
|
||||
}
|
||||
}
|
||||
@@ -19,4 +19,7 @@ package io.nosqlbench.adapter.qdrant.types;
|
||||
public enum QdrantOpType {
|
||||
create_collection,
|
||||
delete_collection,
|
||||
create_payload_index,
|
||||
// https://qdrant.github.io/qdrant/redoc/index.html#tag/points/operation/search_points
|
||||
search_points,
|
||||
}
|
||||
|
||||
@@ -13,11 +13,21 @@ description: |
|
||||
|
||||
scenarios:
|
||||
qdrant_vectors:
|
||||
delete_collection: >-
|
||||
run tags==block:delete_collection
|
||||
errors===stop
|
||||
cycles===UNDEF threads===UNDEF
|
||||
uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file)
|
||||
schema_collection: >-
|
||||
run tags==block:schema_collection
|
||||
errors===stop
|
||||
cycles===UNDEF threads===UNDEF
|
||||
uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file)
|
||||
search_points: >-
|
||||
run tags==block:search_points
|
||||
errors===stop
|
||||
cycles===TEMPLATE(testann_cycles,1000) threads===TEMPLATE(testann_threads,AUTO)
|
||||
uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file)
|
||||
|
||||
params:
|
||||
driver: qdrant
|
||||
@@ -40,14 +50,73 @@ bindings:
|
||||
train_floatlist_fvec_batch: Mul(TEMPLATE(batch_size,10)L); ListSizedStepped(TEMPLATE(batch_size),FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(trainsize)_base_vectors.fvec",TEMPLATE(dimensions),0));
|
||||
|
||||
blocks:
|
||||
delete_collection:
|
||||
ops:
|
||||
# https://qdrant.github.io/qdrant/redoc/index.html#tag/collections/operation/delete_collection
|
||||
delete_col_op:
|
||||
delete_collection: "TEMPLATE(collection)"
|
||||
|
||||
schema_collection:
|
||||
ops:
|
||||
# https://qdrant.github.io/qdrant/redoc/index.html#tag/collections/operation/create_collection
|
||||
create_col_op:
|
||||
create_collection: "TEMPLATE(collection)"
|
||||
# description: "TEMPLATE(desc,a simple qdrant vector collection)"
|
||||
# consistency_level: "TEMPLATE(write_cl,BOUNDED)"
|
||||
on_disk_payload: true
|
||||
shard_number: 1
|
||||
replication_factor: 1
|
||||
write_consistency_factor: 1
|
||||
vectors:
|
||||
value:
|
||||
size: TEMPLATE(dimensions,25)
|
||||
distance: TEMPLATE(similarity_function,cosine)
|
||||
# https://github.com/qdrant/qdrant/blob/v1.9.0/lib/api/src/grpc/proto/collections.proto#L90-L96
|
||||
# 1 = Cosine, 2 = Euclid, 3 = Dot, 4 = Manhattan, 0 = UnknownDistance
|
||||
distance_value: TEMPLATE(similarity_function,1)
|
||||
on_disk: true
|
||||
# https://github.com/qdrant/qdrant/blob/v1.9.0/lib/api/src/grpc/proto/collections.proto#L5-L9
|
||||
# 0 = Default, 1 = Float32, 2 = Uint8
|
||||
datatype_value: 1
|
||||
hnsw_config:
|
||||
m: 16
|
||||
ef_construct: 100
|
||||
full_scan_threshold: 10000
|
||||
max_indexing_threads: 0
|
||||
on_disk: true
|
||||
#payload_m: 16
|
||||
quantization_config:
|
||||
binary:
|
||||
always_ram: false
|
||||
#scalar:
|
||||
# # https://github.com/qdrant/qdrant/blob/v1.9.0/lib/api/src/grpc/proto/collections.proto#L117-L120
|
||||
# # 0 = UnknownQuantization, 1 = Inet8
|
||||
# type: 1
|
||||
# quantile: 0.99
|
||||
# always_ram: false
|
||||
#product:
|
||||
# compression: x16
|
||||
# always_ram: false
|
||||
wal_config:
|
||||
wal_capacity_mb: 32
|
||||
wal_segments_ahead: 0
|
||||
optimizer_config:
|
||||
deleted_threshold: 0.2
|
||||
vacuum_min_vector_number: 1000
|
||||
default_segment_number: 0
|
||||
indexing_threshold: 20000
|
||||
flush_interval_sec: 5
|
||||
#sparse_vectors:
|
||||
# svec1:
|
||||
# full_scan_threshold: 100
|
||||
# on_disk: true
|
||||
|
||||
search_points:
|
||||
ops:
|
||||
search_points_op:
|
||||
search_points: "TEMPLATE(collection)"
|
||||
vector_name: "value"
|
||||
vector_vector: "{test_floatlist_TEMPLATE(filetype)}"
|
||||
limit: TEMPLATE(select_limit,100)
|
||||
with_payload: true
|
||||
with_vector: true
|
||||
# https://github.com/qdrant/qdrant/blob/v1.9.0/lib/api/src/grpc/proto/points.proto#L21-L25
|
||||
# 0 - All, 1 - Majority, 2 - Quorum
|
||||
read_consistency: 2
|
||||
|
||||
Reference in New Issue
Block a user