misc async->sync operations

This commit is contained in:
Madhavan Sridharan 2024-05-07 06:39:34 -04:00
parent c74a2acd94
commit 7aacbb0c50
12 changed files with 299 additions and 24 deletions

View File

@ -0,0 +1,14 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="qdrant_upsert_points_glove_25" type="JarApplication" folderName="Qdrant">
<extension name="software.aws.toolkits.jetbrains.core.execution.JavaAwsConnectionExtension">
<option name="credential" />
<option name="region" />
<option name="useCurrentConnection" value="false" />
</extension>
<option name="JAR_PATH" value="$PROJECT_DIR$/nb5/target/nb5.jar" />
<option name="PROGRAM_PARAMETERS" value="qdrant_vectors_live qdrant_vectors.rampup dimensions=25 testsize=10000 trainsize=1183514 train_threads=AUTO train_cycles=5..10 dataset=glove-25-angular filetype=hdf5 collection=glove_25 similarity_function=1 qdranthost=ded78a51-8370-47d8-adb0-6147f0fcbba2.us-east4-0.gcp.cloud.qdrant.io token_file=./apikey grpc_port=6334 --progress console:1s -v --add-labels &quot;dimensions:25,dataset=glove-25&quot; --show-stacktraces --logs-max 5" />
<option name="WORKING_DIRECTORY" value="$ProjectFileDir$/local/qdrant" />
<option name="ALTERNATIVE_JRE_PATH" value="jdk21" />
<method v="2" />
</configuration>
</component>

View File

@ -61,6 +61,7 @@ public class QdrantOpMapper implements OpMapper<QdrantBaseOp<?>> {
case create_payload_index ->
new QdrantCreatePayloadIndexOpDispenser(adapter, op, typeAndTarget.targetFunction);
case search_points -> new QdrantSearchPointsOpDispenser(adapter, op, typeAndTarget.targetFunction);
case upsert_points -> new QdrantUpsertPointsOpDispenser(adapter, op, typeAndTarget.targetFunction);
// default -> throw new RuntimeException("Unrecognized op type '" + typeAndTarget.enumId.name() + "' while " +
// "mapping parsed op " + op);
};

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 nosqlbench
* Copyright (c) 2020-2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.

View File

@ -42,7 +42,10 @@ public class QdrantSearchPointsOpDispenser extends QdrantBaseOpDispenser<SearchP
}
@Override
public LongFunction<SearchPoints> getParamFunc(LongFunction<QdrantClient> clientF, ParsedOp op, LongFunction<String> targetF) {
public LongFunction<SearchPoints> getParamFunc(
LongFunction<QdrantClient> clientF,
ParsedOp op,
LongFunction<String> targetF) {
LongFunction<SearchPoints.Builder> ebF =
l -> SearchPoints.newBuilder().setCollectionName(targetF.apply(l));
@ -55,7 +58,7 @@ public class QdrantSearchPointsOpDispenser extends QdrantBaseOpDispenser<SearchP
(SearchPoints.Builder b, Boolean wp) -> b.setWithVectors(WithVectorsSelector.newBuilder().setEnable(wp).build()));
ebF = op.enhanceFuncOptionally(ebF, "read_consistency", Number.class,
(SearchPoints.Builder b, Number rc) -> b.setReadConsistency(
ReadConsistency.newBuilder().setType(ReadConsistencyType.valueOf(rc.intValue())).build()));
ReadConsistency.newBuilder().setType(ReadConsistencyType.forNumber(rc.intValue())).build()));
// ebF = op.enhanceFunc(ebF, List.of("vector_vector", "vectors"), List.class,
// (SearchPoints.Builder b, List<Float> vec) -> b.addAllVector(vec));

View File

@ -0,0 +1,169 @@
/*
* Copyright (c) 2020-2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.qdrant.opdispensers;
import io.nosqlbench.adapter.qdrant.QdrantDriverAdapter;
import io.nosqlbench.adapter.qdrant.ops.QdrantBaseOp;
import io.nosqlbench.adapter.qdrant.ops.QdrantUpsertPointsOp;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.ValueFactory;
import io.qdrant.client.VectorFactory;
import io.qdrant.client.VectorsFactory;
import io.qdrant.client.grpc.Collections;
import io.qdrant.client.grpc.JsonWithInt.ListValue;
import io.qdrant.client.grpc.JsonWithInt.NullValue;
import io.qdrant.client.grpc.JsonWithInt.Struct;
import io.qdrant.client.grpc.JsonWithInt.Value;
import io.qdrant.client.grpc.Points.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.LongFunction;
public class QdrantUpsertPointsOpDispenser extends QdrantBaseOpDispenser<UpsertPoints> {
private static final Logger logger = LogManager.getLogger(QdrantUpsertPointsOpDispenser.class);
/**
* Create a new {@link QdrantUpsertPointsOpDispenser} implementing the {@link OpDispenser} interface.
* @param adapter
* @param op
* @param targetFunction
* @see <a href="https://qdrant.github.io/qdrant/redoc/index.html#tag/points/operation/upsert_points">Upsert Points</a>
*/
public QdrantUpsertPointsOpDispenser(QdrantDriverAdapter adapter, ParsedOp op, LongFunction<String> targetFunction) {
super(adapter, op, targetFunction);
}
@Override
public LongFunction<UpsertPoints> getParamFunc(
LongFunction<QdrantClient> clientF,
ParsedOp op,
LongFunction<String> targetF
) {
LongFunction<UpsertPoints.Builder> ebF =
l -> UpsertPoints.newBuilder().setCollectionName(targetF.apply(l));
// set wait and ordering query params
ebF = op.enhanceFuncOptionally(ebF, "wait", Boolean.class, UpsertPoints.Builder::setWait);
WriteOrdering.Builder writeOrdering = WriteOrdering.newBuilder();
op.getOptionalStaticValue("ordering", Number.class)
.ifPresent((Number ordering) -> {
writeOrdering.setType(WriteOrderingType.forNumber(ordering.intValue()));
});
final LongFunction<UpsertPoints.Builder> orderingF = ebF;
ebF = l -> orderingF.apply(l).setOrdering(writeOrdering);
// request body begins here
ShardKeySelector.Builder shardKeySelector = ShardKeySelector.newBuilder();
op.getOptionalStaticValue("shard_key", Number.class)
.ifPresent((Number value) -> {
shardKeySelector.setShardKeys(0, Collections.ShardKey.newBuilder().setNumber(value.longValue()));
});
List<PointStruct> allPoints = buildPointsStructWithNamedVectors(op);
final LongFunction<UpsertPoints.Builder> pointsOfNamedVectorsF = ebF;
ebF = l -> pointsOfNamedVectorsF.apply(l).addAllPoints(allPoints);
final LongFunction<UpsertPoints.Builder> lastF = ebF;
return l -> lastF.apply(l).build();
}
private List<PointStruct> buildPointsStructWithNamedVectors(ParsedOp op) {
List<PointStruct> allPoints = new ArrayList<>();
PointStruct.Builder pointBuilder = PointStruct.newBuilder();
PointId.Builder pointId = PointId.newBuilder();
// id is mandatory
Object idObject = op.getAsRequiredFunction("id", Object.class).apply(0L);
if (idObject instanceof Number) {
pointId.setNum(((Number) idObject).longValue());
} else if (idObject instanceof String) {
pointId.setUuid((String) idObject);
}
pointBuilder.setId(pointId);
if (op.isDefined("payload")) {
LongFunction<Map> payloadMapF = op.getAsRequiredFunction("payload", Map.class);
Map<String, Value> payloadMapData = new HashMap<>();
payloadMapF.apply(0L).forEach((pKey, pVal) -> {
if(pVal instanceof Boolean) {
payloadMapData.put((String) pKey, ValueFactory.value((Boolean) pVal));
} else if(pVal instanceof Double) {
payloadMapData.put((String) pKey, ValueFactory.value((Double) pVal));
} else if(pVal instanceof Integer) {
payloadMapData.put((String) pKey, ValueFactory.value((Integer) pVal));
} else if(pVal instanceof String) {
payloadMapData.put((String) pKey, ValueFactory.value((String) pVal));
} else if(pVal instanceof ListValue) {
payloadMapData.put((String) pKey, ValueFactory.list((List<Value>) pVal));
} else if(pVal instanceof NullValue) {
payloadMapData.put((String) pKey, ValueFactory.nullValue());
} else if(pVal instanceof Struct) {
payloadMapData.put((String) pKey, Value.newBuilder().setStructValue((Struct) pVal).build());
} else {
logger.warn("Unknown payload type passed." +
" Only https://qdrant.tech/documentation/concepts/payload/#payload-types are supported." +
" {} will be inored.", pVal.toString());
}
});
pointBuilder.putAllPayload(payloadMapData);
}
LongFunction<Map> namedVectorMapF = op.getAsRequiredFunction("vector", Map.class);
Map<String, Vector> namedVectorMapData = new HashMap<>();
List<Float> sparseVectors = new ArrayList<>();
List<Integer> sparseIndices = new ArrayList<>();
namedVectorMapF.apply(0L).forEach((nvKey, nvVal) -> {
if (nvVal instanceof Map) {
// we deal with named sparse vectors here
Map<String, Object> namedSparseVectorsMap = (Map<String, Object>) nvVal;
if (namedSparseVectorsMap.containsKey("indices") && namedSparseVectorsMap.containsKey("values")) {
sparseVectors.addAll((List<Float>) namedSparseVectorsMap.get("values"));
sparseIndices.addAll((List<Integer>) namedSparseVectorsMap.get("indices"));
}
namedVectorMapData.put((String) nvKey, VectorFactory.vector(sparseVectors, sparseIndices));
} else {
// Deal with regular named dense vectors here
namedVectorMapData.put((String) nvKey, VectorFactory.vector((List<Float>) nvVal));
}
});
pointBuilder.setVectors(VectorsFactory.namedVectors(namedVectorMapData));
allPoints.add(pointBuilder.build());
return allPoints;
}
/**
* Create a new {@link QdrantUpsertPointsOp} implementing the {@link QdrantBaseOp} interface.
* @see <a href="https://qdrant.tech/documentation/concepts/points/">Upsert Points</a>
*/
@Override
public LongFunction<QdrantBaseOp<UpsertPoints>> createOpFunc(
LongFunction<UpsertPoints> paramF,
LongFunction<QdrantClient> clientF,
ParsedOp op,
LongFunction<String> targetF
) {
return l -> new QdrantUpsertPointsOp(clientF.apply(l), paramF.apply(l));
}
}

View File

@ -16,12 +16,13 @@
package io.nosqlbench.adapter.qdrant.ops;
import com.google.common.util.concurrent.ListenableFuture;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.grpc.Collections.CollectionOperationResponse;
import io.qdrant.client.grpc.Collections.CreateCollection;
import java.util.concurrent.ExecutionException;
public class QdrantCreateCollectionOp extends QdrantBaseOp<CreateCollection> {
/**
* Create a new {@link ParsedOp} encapsulating a call to the <b>Qdrant</b> create collection method.
@ -35,19 +36,12 @@ public class QdrantCreateCollectionOp extends QdrantBaseOp<CreateCollection> {
@Override
public Object applyOp(long value) {
// ListenableFuture<CollectionOperationResponse> response = client.createCollectionAsync(
// CreateCollection.newBuilder()
// .setCollectionName("test")
// .setVectorsConfig(VectorsConfig.newBuilder()
// .setParams(
// VectorParams.newBuilder()
// .setDistance(Distance.Cosine)
// .setSize(25)
// .build()
// ).build()
// ).build()
// );
ListenableFuture<CollectionOperationResponse> response = client.createCollectionAsync(request);
CollectionOperationResponse response = null;
try {
response = client.createCollectionAsync(request).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return response;
}
}

View File

@ -16,11 +16,12 @@
package io.nosqlbench.adapter.qdrant.ops;
import com.google.common.util.concurrent.ListenableFuture;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.grpc.Collections.CollectionOperationResponse;
import io.qdrant.client.grpc.Collections.DeleteCollection;
import java.util.concurrent.ExecutionException;
public class QdrantDeleteCollectionOp extends QdrantBaseOp<DeleteCollection> {
public QdrantDeleteCollectionOp(QdrantClient client, DeleteCollection request) {
super(client, request);
@ -28,7 +29,12 @@ public class QdrantDeleteCollectionOp extends QdrantBaseOp<DeleteCollection> {
@Override
public Object applyOp(long value) {
ListenableFuture<CollectionOperationResponse> response = client.deleteCollectionAsync(request.getCollectionName());
CollectionOperationResponse response = null;
try {
response = client.deleteCollectionAsync(request.getCollectionName()).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return response;
}

View File

@ -26,6 +26,7 @@ public class QdrantPayloadIndexOp extends QdrantBaseOp<PayloadIndexParams> {
@Override
public Object applyOp(long value) {
//client.createPayloadIndexAsync(PayloadIndexParams.get);
return null;
}
}

View File

@ -16,12 +16,12 @@
package io.nosqlbench.adapter.qdrant.ops;
import com.google.common.util.concurrent.ListenableFuture;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.grpc.Points.ScoredPoint;
import io.qdrant.client.grpc.Points.SearchPoints;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class QdrantSearchPointsOp extends QdrantBaseOp<SearchPoints> {
public QdrantSearchPointsOp(QdrantClient client, SearchPoints request) {
@ -30,8 +30,12 @@ public class QdrantSearchPointsOp extends QdrantBaseOp<SearchPoints> {
@Override
public Object applyOp(long value) {
ListenableFuture<List<ScoredPoint>> result =
client.searchAsync(request);
List<ScoredPoint> result = null;
try {
result = client.searchAsync(request).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return result;
}
}

View File

@ -0,0 +1,51 @@
/*
* Copyright (c) 2020-2024 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.qdrant.ops;
import io.qdrant.client.QdrantClient;
import io.qdrant.client.grpc.Points.UpdateResult;
import io.qdrant.client.grpc.Points.UpsertPoints;
import java.util.concurrent.ExecutionException;
public class QdrantUpsertPointsOp extends QdrantBaseOp<UpsertPoints> {
public QdrantUpsertPointsOp(QdrantClient client, UpsertPoints request) {
super(client, request);
}
@Override
public Object applyOp(long value) {
UpdateResult response = null;
String responseStatus;
long responseOperationId;
try {
response = client.upsertAsync(request).get();
responseStatus = response.getStatus().toString();
responseOperationId = response.getOperationId();
switch(response.getStatus()) {
case Completed, Acknowledged -> logger.trace("Upsert points finished successfully." +
" [Status ({}) for Operation id ({})]", responseStatus, responseOperationId);
case UnknownUpdateStatus, ClockRejected -> logger.error("Upsert points failed with status '{}'" +
" for operation id '{}'", responseStatus, responseOperationId);
default -> logger.error("Unknown status '{}' for operation id '{}'", responseStatus, responseOperationId);
}
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
return response;
}
}

View File

@ -22,4 +22,7 @@ public enum QdrantOpType {
create_payload_index,
// https://qdrant.github.io/qdrant/redoc/index.html#tag/points/operation/search_points
search_points,
// https://qdrant.tech/documentation/concepts/points/
// https://qdrant.github.io/qdrant/redoc/index.html#tag/points/operation/upsert_points
upsert_points,
}

View File

@ -23,10 +23,15 @@ scenarios:
errors===stop
cycles===UNDEF threads===UNDEF
uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file)
rampup: >-
run tags==block:rampup
errors===warn,counter
cycles===TEMPLATE(train_cycles,TEMPLATE(trainsize,1000)) threads===TEMPLATE(train_threads,AUTO)
uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file)
search_points: >-
run tags==block:search_points
errors===stop
cycles===TEMPLATE(testann_cycles,1000) threads===TEMPLATE(testann_threads,AUTO)
errors===warn,counter
cycles===TEMPLATE(testann_cycles,TEMPLATE(testsize,1000)) threads===TEMPLATE(testann_threads,AUTO)
uri=TEMPLATE(qdranthost) token_file=TEMPLATE(token_file)
params:
@ -34,6 +39,7 @@ params:
instrument: true
bindings:
id_val: Identity();
row_key: ToString()
row_key_batch: Mul(TEMPLATE(batch_size)L); ListSizedStepped(TEMPLATE(batch_size),long->ToString());
# filetype=hdf5 for TEMPLATE(filetype,hdf5)
@ -48,6 +54,8 @@ bindings:
distance_floatlist_fvec: FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(testsize)_distances_count.fvec",TEMPLATE(dimensions),0);
train_floatlist_fvec: FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(trainsize)_base_vectors.fvec",TEMPLATE(dimensions),0);
train_floatlist_fvec_batch: Mul(TEMPLATE(batch_size,10)L); ListSizedStepped(TEMPLATE(batch_size),FVecReader("testdata/TEMPLATE(dataset)_TEMPLATE(trainsize)_base_vectors.fvec",TEMPLATE(dimensions),0));
test_2d_fl_vecs: HashedFloatVectors(2)
list_sized: ListSized(HashaRange(2,2), Identity(), Identity())
blocks:
delete_collection:
@ -108,6 +116,27 @@ blocks:
# full_scan_threshold: 100
# on_disk: true
rampup:
ops:
upsert_points_op:
upsert_points: "TEMPLATE(collection)"
wait: TEMPLATE(upsert_point_wait,true)
# https://github.com/qdrant/qdrant/blob/v1.9.0/lib/api/src/grpc/proto/points.proto#L11-L15
# 0 - Weak, 1 - Medium, 2 - Strong
ordering: TEMPLATE(upsert_point_ordering,1)
#shard_key: "{row_key}"
id: "{id_val}"
payload:
key: "{row_key}"
vector:
# For dense vectors, use the below format
value: "{train_floatlist_TEMPLATE(filetype)}"
# For sparse vectors, use the below format
#value_sv:
# indices: your array of numbers
# values: your array of floats
search_points:
ops:
search_points_op: