mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2024-12-22 23:23:56 -06:00
upsert points operation enhancements
This commit is contained in:
parent
c64fd45f6c
commit
bbdd641318
@ -6,7 +6,7 @@
|
||||
<option name="useCurrentConnection" value="false" />
|
||||
</extension>
|
||||
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
|
||||
<option name="PROGRAM_PARAMETERS" value="qdrant_vectors_live qdrant_vectors.rampup dimensions=25 testsize=10000 trainsize=1183514 train_threads=AUTO train_cycles=5..10 dataset=glove-25-angular filetype=hdf5 collection=glove_25 similarity_function=1 qdranthost=ded78a51-8370-47d8-adb0-6147f0fcbba2.us-east4-0.gcp.cloud.qdrant.io token_file=./apikey grpc_port=6334 --progress console:1s -v --add-labels "dimensions:25,dataset=glove-25" --show-stacktraces --logs-max 5" />
|
||||
<option name="PROGRAM_PARAMETERS" value="qdrant_vectors_live qdrant_vectors.rampup dimensions=25 testsize=10000 trainsize=1183514 dataset=glove-25-angular filetype=hdf5 collection=glove_25 similarity_function=1 qdranthost=ded78a51-8370-47d8-adb0-6147f0fcbba2.us-east4-0.gcp.cloud.qdrant.io token_file=./apikey grpc_port=6334 --progress console:1s -v --add-labels "dimensions:25,dataset=glove-25" --show-stacktraces --logs-max 5" />
|
||||
<option name="WORKING_DIRECTORY" value="$ProjectFileDir$/local/qdrant" />
|
||||
<option name="ALTERNATIVE_JRE_PATH" value="jdk21" />
|
||||
<method v="2" />
|
||||
|
@ -21,23 +21,19 @@ import io.nosqlbench.adapter.qdrant.ops.QdrantBaseOp;
|
||||
import io.nosqlbench.adapter.qdrant.ops.QdrantUpsertPointsOp;
|
||||
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import io.qdrant.client.QdrantClient;
|
||||
import io.qdrant.client.ValueFactory;
|
||||
import io.qdrant.client.VectorFactory;
|
||||
import io.qdrant.client.VectorsFactory;
|
||||
import io.qdrant.client.grpc.Collections;
|
||||
import io.nosqlbench.nb.api.errors.OpConfigError;
|
||||
import io.qdrant.client.*;
|
||||
import io.qdrant.client.grpc.JsonWithInt.ListValue;
|
||||
import io.qdrant.client.grpc.JsonWithInt.NullValue;
|
||||
import io.qdrant.client.grpc.JsonWithInt.Struct;
|
||||
import io.qdrant.client.grpc.JsonWithInt.Value;
|
||||
import io.qdrant.client.grpc.Points.Vector;
|
||||
import io.qdrant.client.grpc.Points.*;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
import java.util.function.BiConsumer;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class QdrantUpsertPointsOpDispenser extends QdrantBaseOpDispenser<UpsertPoints> {
|
||||
@ -65,92 +61,119 @@ public class QdrantUpsertPointsOpDispenser extends QdrantBaseOpDispenser<UpsertP
|
||||
|
||||
// set wait and ordering query params
|
||||
ebF = op.enhanceFuncOptionally(ebF, "wait", Boolean.class, UpsertPoints.Builder::setWait);
|
||||
WriteOrdering.Builder writeOrdering = WriteOrdering.newBuilder();
|
||||
op.getOptionalStaticValue("ordering", Number.class)
|
||||
.ifPresent((Number ordering) -> {
|
||||
writeOrdering.setType(WriteOrderingType.forNumber(ordering.intValue()));
|
||||
});
|
||||
final LongFunction<UpsertPoints.Builder> orderingF = ebF;
|
||||
ebF = l -> orderingF.apply(l).setOrdering(writeOrdering);
|
||||
ebF = op.enhanceFuncOptionally(ebF, "ordering", Number.class, (UpsertPoints.Builder b, Number n) ->
|
||||
b.setOrdering(WriteOrdering.newBuilder().setType(WriteOrderingType.forNumber(n.intValue()))));
|
||||
|
||||
// request body begins here
|
||||
ShardKeySelector.Builder shardKeySelector = ShardKeySelector.newBuilder();
|
||||
op.getOptionalStaticValue("shard_key", Number.class)
|
||||
.ifPresent((Number value) -> {
|
||||
shardKeySelector.setShardKeys(0, Collections.ShardKey.newBuilder().setNumber(value.longValue()));
|
||||
});
|
||||
|
||||
List<PointStruct> allPoints = buildPointsStructWithNamedVectors(op);
|
||||
ebF = op.enhanceFuncOptionally(ebF, "shard_key", String.class, (UpsertPoints.Builder b, String sk) ->
|
||||
b.setShardKeySelector(ShardKeySelectorFactory.shardKeySelector(sk)));
|
||||
LongFunction<List<PointStruct>> pointsF = constructVectorPointsFunc(op);
|
||||
final LongFunction<UpsertPoints.Builder> pointsOfNamedVectorsF = ebF;
|
||||
ebF = l -> pointsOfNamedVectorsF.apply(l).addAllPoints(allPoints);
|
||||
ebF = l -> pointsOfNamedVectorsF.apply(l).addAllPoints(pointsF.apply(l));
|
||||
|
||||
final LongFunction<UpsertPoints.Builder> lastF = ebF;
|
||||
return l -> lastF.apply(l).build();
|
||||
}
|
||||
|
||||
private List<PointStruct> buildPointsStructWithNamedVectors(ParsedOp op) {
|
||||
List<PointStruct> allPoints = new ArrayList<>();
|
||||
PointStruct.Builder pointBuilder = PointStruct.newBuilder();
|
||||
|
||||
PointId.Builder pointId = PointId.newBuilder();
|
||||
// id is mandatory
|
||||
Object idObject = op.getAsRequiredFunction("id", Object.class).apply(0L);
|
||||
if (idObject instanceof Number) {
|
||||
pointId.setNum(((Number) idObject).longValue());
|
||||
} else if (idObject instanceof String) {
|
||||
pointId.setUuid((String) idObject);
|
||||
}
|
||||
pointBuilder.setId(pointId);
|
||||
|
||||
if (op.isDefined("payload")) {
|
||||
LongFunction<Map> payloadMapF = op.getAsRequiredFunction("payload", Map.class);
|
||||
Map<String, Value> payloadMapData = new HashMap<>();
|
||||
payloadMapF.apply(0L).forEach((pKey, pVal) -> {
|
||||
if(pVal instanceof Boolean) {
|
||||
payloadMapData.put((String) pKey, ValueFactory.value((Boolean) pVal));
|
||||
} else if(pVal instanceof Double) {
|
||||
payloadMapData.put((String) pKey, ValueFactory.value((Double) pVal));
|
||||
} else if(pVal instanceof Integer) {
|
||||
payloadMapData.put((String) pKey, ValueFactory.value((Integer) pVal));
|
||||
} else if(pVal instanceof String) {
|
||||
payloadMapData.put((String) pKey, ValueFactory.value((String) pVal));
|
||||
} else if(pVal instanceof ListValue) {
|
||||
payloadMapData.put((String) pKey, ValueFactory.list((List<Value>) pVal));
|
||||
} else if(pVal instanceof NullValue) {
|
||||
payloadMapData.put((String) pKey, ValueFactory.nullValue());
|
||||
} else if(pVal instanceof Struct) {
|
||||
payloadMapData.put((String) pKey, Value.newBuilder().setStructValue((Struct) pVal).build());
|
||||
/**
|
||||
* @param op the {@link ParsedOp} from which the vector objects will be built
|
||||
* @return an Iterable Collection of {@link PointStruct} objects to be added to a Qdrant {@link UpsertPoints} request.
|
||||
* <p>
|
||||
* This method interrogates the subsection of the ParsedOp defined for vector parameters and constructs a list of
|
||||
* vector (dense plus sparse) points based on the included values, or returns null if this section is not populated.
|
||||
* The base function returns either the List of vectors or null, while the interior function builds the vectors
|
||||
* with a Builder pattern based on the values contained in the source ParsedOp.
|
||||
*/
|
||||
private LongFunction<List<PointStruct>> constructVectorPointsFunc(ParsedOp op) {
|
||||
Optional<LongFunction<List>> baseFunc =
|
||||
op.getAsOptionalFunction("points", List.class);
|
||||
return baseFunc.<LongFunction<List<PointStruct>>>map(listLongFunction -> l -> {
|
||||
List<PointStruct> returnVectorPoints = new ArrayList<>();
|
||||
List<Map<String, Object>> vectorPoints = listLongFunction.apply(l);
|
||||
PointStruct.Builder pointBuilder;
|
||||
for (Map<String, Object> point : vectorPoints) {
|
||||
pointBuilder = PointStruct.newBuilder();
|
||||
// 'id' field is mandatory, if not present, server will throw an exception
|
||||
PointId.Builder pointId = PointId.newBuilder();
|
||||
if (point.get("id") instanceof Number) {
|
||||
pointId.setNum(((Number) point.get("id")).longValue());
|
||||
} else if (point.get("id") instanceof String) {
|
||||
pointId.setUuid((String) point.get("id"));
|
||||
} else {
|
||||
logger.warn("Unknown payload type passed." +
|
||||
logger.warn("Unsupported 'id' value type [{}] specified for 'points'. Ignoring.",
|
||||
point.get("id").getClass().getSimpleName());
|
||||
}
|
||||
pointBuilder.setId(pointId);
|
||||
if (point.containsKey("payload")) {
|
||||
pointBuilder.putAllPayload(getPayloadValues(point.get("payload")));
|
||||
}
|
||||
pointBuilder.setVectors(VectorsFactory.namedVectors(getNamedVectorMap(point.get("vector"))));
|
||||
returnVectorPoints.add(pointBuilder.build());
|
||||
}
|
||||
return returnVectorPoints;
|
||||
}).orElse(null);
|
||||
}
|
||||
|
||||
private Map<String, Vector> getNamedVectorMap(Object rawVectorValues) {
|
||||
Map<String, Vector> namedVectorMapData;
|
||||
if (rawVectorValues instanceof Map) {
|
||||
namedVectorMapData = new HashMap<>();
|
||||
List<Float> sparseVectors = new ArrayList<>();
|
||||
List<Integer> sparseIndices = new ArrayList<>();
|
||||
BiConsumer<String, Object> namedVectorsToPointsVectorValue = (nvkey, nvVal) -> {
|
||||
Vector targetVectorVal;
|
||||
if (nvVal instanceof Map) {
|
||||
// Deal with named sparse vectors here
|
||||
((Map<String, Object>) nvVal).forEach(
|
||||
(svKey, svValue) -> {
|
||||
if ("values".equals(svKey)) {
|
||||
sparseVectors.addAll((List<Float>) svValue);
|
||||
} else if ("indices".equals(svKey)) {
|
||||
sparseIndices.addAll((List<Integer>) svValue);
|
||||
} else {
|
||||
logger.warn("Unrecognized sparse vector field [{}] provided. Ignoring.", svKey);
|
||||
}
|
||||
}
|
||||
);
|
||||
targetVectorVal = VectorFactory.vector(sparseVectors, sparseIndices);
|
||||
} else if (nvVal instanceof List) {
|
||||
// Deal with regular named dense vectors here
|
||||
targetVectorVal = VectorFactory.vector((List<Float>) nvVal);
|
||||
} else
|
||||
throw new RuntimeException("Unsupported 'vector' value type [" + nvVal.getClass().getSimpleName() + " ]");
|
||||
namedVectorMapData.put(nvkey, targetVectorVal);
|
||||
};
|
||||
((Map<String, Object>) rawVectorValues).forEach(namedVectorsToPointsVectorValue);
|
||||
} else {
|
||||
throw new OpConfigError("Invalid format of type" +
|
||||
" [" + rawVectorValues.getClass().getSimpleName() + "] specified for 'vector'");
|
||||
}
|
||||
return namedVectorMapData;
|
||||
}
|
||||
|
||||
private Map<String, Value> getPayloadValues(Object rawPayloadValues) {
|
||||
if (rawPayloadValues instanceof Map) {
|
||||
Map<String, Object> payloadMap = (Map<String, Object>) rawPayloadValues;
|
||||
Map<String, Value> payloadMapData = new HashMap<>(payloadMap.size());
|
||||
payloadMap.forEach((pKey, pVal) -> {
|
||||
switch (pVal) {
|
||||
case Boolean b -> payloadMapData.put(pKey, ValueFactory.value(b));
|
||||
case Double v -> payloadMapData.put(pKey, ValueFactory.value(v));
|
||||
case Integer i -> payloadMapData.put(pKey, ValueFactory.value(i));
|
||||
case String s -> payloadMapData.put(pKey, ValueFactory.value(s));
|
||||
case ListValue listValue -> payloadMapData.put(pKey, ValueFactory.list((List<Value>) pVal));
|
||||
case NullValue nullValue -> payloadMapData.put(pKey, ValueFactory.nullValue());
|
||||
case Struct struct -> payloadMapData.put(pKey, Value.newBuilder().setStructValue(struct).build());
|
||||
default -> logger.warn("Unknown payload value type passed." +
|
||||
" Only https://qdrant.tech/documentation/concepts/payload/#payload-types are supported." +
|
||||
" {} will be inored.", pVal.toString());
|
||||
" {} will be ignored.", pVal.toString());
|
||||
}
|
||||
});
|
||||
pointBuilder.putAllPayload(payloadMapData);
|
||||
return payloadMapData;
|
||||
} else {
|
||||
throw new RuntimeException("Invalid format of type" +
|
||||
" [" + rawPayloadValues.getClass().getSimpleName() + "] specified for payload");
|
||||
}
|
||||
|
||||
LongFunction<Map> namedVectorMapF = op.getAsRequiredFunction("vector", Map.class);
|
||||
Map<String, Vector> namedVectorMapData = new HashMap<>();
|
||||
List<Float> sparseVectors = new ArrayList<>();
|
||||
List<Integer> sparseIndices = new ArrayList<>();
|
||||
namedVectorMapF.apply(0L).forEach((nvKey, nvVal) -> {
|
||||
if (nvVal instanceof Map) {
|
||||
// we deal with named sparse vectors here
|
||||
Map<String, Object> namedSparseVectorsMap = (Map<String, Object>) nvVal;
|
||||
if (namedSparseVectorsMap.containsKey("indices") && namedSparseVectorsMap.containsKey("values")) {
|
||||
sparseVectors.addAll((List<Float>) namedSparseVectorsMap.get("values"));
|
||||
sparseIndices.addAll((List<Integer>) namedSparseVectorsMap.get("indices"));
|
||||
}
|
||||
namedVectorMapData.put((String) nvKey, VectorFactory.vector(sparseVectors, sparseIndices));
|
||||
} else {
|
||||
// Deal with regular named dense vectors here
|
||||
namedVectorMapData.put((String) nvKey, VectorFactory.vector((List<Float>) nvVal));
|
||||
}
|
||||
});
|
||||
pointBuilder.setVectors(VectorsFactory.namedVectors(namedVectorMapData));
|
||||
allPoints.add(pointBuilder.build());
|
||||
|
||||
return allPoints;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -33,15 +33,19 @@ public class QdrantUpsertPointsOp extends QdrantBaseOp<UpsertPoints> {
|
||||
String responseStatus;
|
||||
long responseOperationId;
|
||||
try {
|
||||
logger.debug("[QdrantUpsertPointsOp] Cycle {} has Request: {}", value, request.toString());
|
||||
response = client.upsertAsync(request).get();
|
||||
responseStatus = response.getStatus().toString();
|
||||
responseOperationId = response.getOperationId();
|
||||
switch(response.getStatus()) {
|
||||
case Completed, Acknowledged -> logger.trace("Upsert points finished successfully." +
|
||||
case Completed, Acknowledged ->
|
||||
logger.trace("[QdrantUpsertPointsOp] Upsert points finished successfully." +
|
||||
" [Status ({}) for Operation id ({})]", responseStatus, responseOperationId);
|
||||
case UnknownUpdateStatus, ClockRejected -> logger.error("Upsert points failed with status '{}'" +
|
||||
case UnknownUpdateStatus, ClockRejected ->
|
||||
logger.error("[QdrantUpsertPointsOp] Upsert points failed with status '{}'" +
|
||||
" for operation id '{}'", responseStatus, responseOperationId);
|
||||
default -> logger.error("Unknown status '{}' for operation id '{}'", responseStatus, responseOperationId);
|
||||
default ->
|
||||
logger.error("[QdrantUpsertPointsOp] Unknown status '{}' for operation id '{}'", responseStatus, responseOperationId);
|
||||
}
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
|
@ -25,7 +25,7 @@ scenarios:
|
||||
uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file)
|
||||
rampup: >-
|
||||
run tags==block:rampup
|
||||
errors===warn,counter
|
||||
errors===warn,counter,retry
|
||||
cycles===TEMPLATE(train_cycles,TEMPLATE(trainsize,1000)) threads===TEMPLATE(train_threads,AUTO)
|
||||
uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file)
|
||||
count_vectors: >-
|
||||
@ -59,8 +59,6 @@ bindings:
|
||||
distance_floatlist_fvec: FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(testsize)_distances_count.fvec",TEMPLATE(dimensions),0);
|
||||
train_floatlist_fvec: FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(trainsize)_base_vectors.fvec",TEMPLATE(dimensions),0);
|
||||
train_floatlist_fvec_batch: Mul(TEMPLATE(batch_size,10)L); ListSizedStepped(TEMPLATE(batch_size),FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(trainsize)_base_vectors.fvec",TEMPLATE(dimensions),0));
|
||||
test_2d_fl_vecs: HashedFloatVectors(2)
|
||||
list_sized: ListSized(HashaRange(2,2), Identity(), Identity())
|
||||
|
||||
blocks:
|
||||
delete_collection:
|
||||
@ -130,17 +128,17 @@ blocks:
|
||||
# 0 - Weak, 1 - Medium, 2 - Strong
|
||||
ordering: TEMPLATE(upsert_point_ordering,1)
|
||||
#shard_key: "{row_key}"
|
||||
id: "{id_val}"
|
||||
payload:
|
||||
key: "{row_key}"
|
||||
vector:
|
||||
# For dense vectors, use the below format
|
||||
value: "{train_floatlist_TEMPLATE(filetype)}"
|
||||
# For sparse vectors, use the below format
|
||||
#value_sv:
|
||||
# indices: your array of numbers
|
||||
# values: your array of floats
|
||||
|
||||
points:
|
||||
- id: "{id_val}"
|
||||
payload:
|
||||
key: "{row_key}"
|
||||
vector:
|
||||
# For dense vectors, use the below format
|
||||
value: "{train_floatlist_TEMPLATE(filetype)}"
|
||||
# For sparse vectors, use the below format
|
||||
#value_sv:
|
||||
# indices: your array of numbers
|
||||
# values: your array of floats
|
||||
|
||||
search_points:
|
||||
ops:
|
||||
|
Loading…
Reference in New Issue
Block a user