functionalizing the pinecone dispensers

This commit is contained in:
Mark Wolters 2023-05-10 17:51:04 +00:00 committed by Madhavan
parent 15a13f7bcb
commit a38047f469
6 changed files with 181 additions and 65 deletions

View File

@ -3,6 +3,7 @@ package io.nosqlbench.adapter.pinecone;
import io.nosqlbench.adapter.pinecone.opdispensers.*;
import io.nosqlbench.adapter.pinecone.ops.PineconeOp;
import io.nosqlbench.adapter.pinecone.ops.PineconeOpTypes;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
import io.nosqlbench.engine.api.activityimpl.OpMapper;
import io.nosqlbench.engine.api.activityimpl.uniform.DriverSpaceCache;
@ -17,10 +18,12 @@ public class PineconeOpMapper implements OpMapper<PineconeOp> {
private static final Logger LOGGER = LogManager.getLogger(PineconeOpMapper.class);
private final PineconeDriverAdapter adapter;
private final DriverSpaceCache<? extends PineconeSpace> spaceCache;
private final NBConfiguration cfg;
public PineconeOpMapper(PineconeDriverAdapter adapter, DriverSpaceCache<? extends PineconeSpace> spaceCache) {
public PineconeOpMapper(PineconeDriverAdapter adapter, DriverSpaceCache<? extends PineconeSpace> spaceCache, NBConfiguration cfg) {
this.adapter = adapter;
this.spaceCache = spaceCache;
this.cfg = cfg;
}
@Override

View File

@ -8,12 +8,23 @@ import io.nosqlbench.engine.api.templating.ParsedOp;
import io.pinecone.PineconeConnection;
import io.pinecone.proto.DeleteRequest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.LongFunction;
/**
* return DeleteRequest.newBuilder()
* .setNamespace(namespace)
* .addAllIds(Arrays.asList(idsToDelete))
* .setDeleteAll(false)
* .build();
*/
public class PineconeDeleteOpDispenser extends PineconeOpDispenser {
private DeleteRequest request;
private PineconeConnection connection;
private final String indexName;
private final LongFunction<DeleteRequest> deleteRequestFunc;
public PineconeDeleteOpDispenser(PineconeDriverAdapter adapter,
ParsedOp op,
@ -21,26 +32,45 @@ public class PineconeDeleteOpDispenser extends PineconeOpDispenser {
LongFunction<String> targetFunction) {
super(adapter, op, pcFunction, targetFunction);
String indexName = op.getStaticValue("delete");
connection = pcFunction.apply(0).getConnection(indexName);
request = createDeleteRequest();
indexName = op.getAsRequiredFunction("query", String.class).apply(0);
deleteRequestFunc = createDeleteRequestFunction(op);
}
@Override
public PineconeOp apply(long value) {
return new PineconeDeleteOp(connection, request);
return new PineconeDeleteOp(pcFunction.apply(value).getConnection(indexName), deleteRequestFunc.apply(value));
}
private DeleteRequest createDeleteRequest() {
// TODO: How do I pull these from the ParsedOp?
String[] idsToDelete = {"v2"};
String namespace = "ns";
private LongFunction<DeleteRequest> createDeleteRequestFunction(ParsedOp op) {
LongFunction<DeleteRequest.Builder> rFunc = l -> DeleteRequest.newBuilder();
return DeleteRequest.newBuilder()
.setNamespace(namespace)
.addAllIds(Arrays.asList(idsToDelete))
.setDeleteAll(false)
.build();
Optional<LongFunction<String>> nFunc = op.getAsOptionalFunction("namespace", String.class);
if (nFunc.isPresent()) {
LongFunction<DeleteRequest.Builder> finalFunc = rFunc;
LongFunction<String> af = nFunc.get();
rFunc = l -> finalFunc.apply(l).setNamespace(af.apply(l));
}
Optional<LongFunction<String>> iFunc = op.getAsOptionalFunction("ids", String.class);
if (iFunc.isPresent()) {
LongFunction<DeleteRequest.Builder> finalFunc = rFunc;
LongFunction<String> af = iFunc.get();
LongFunction<List<String>> alf = l -> {
String[] vals = af.apply(l).split(",");
return Arrays.asList(vals);
};
rFunc = l -> finalFunc.apply(l).addAllIds(alf.apply(l));
}
Optional<LongFunction<Boolean>> aFunc = op.getAsOptionalFunction("deleteall", Boolean.class);
if (aFunc.isPresent()) {
LongFunction<DeleteRequest.Builder> finalFunc = rFunc;
LongFunction<Boolean> af = aFunc.get();
rFunc = l -> finalFunc.apply(l).setDeleteAll(af.apply(l));
}
LongFunction<DeleteRequest.Builder> finalRFunc = rFunc;
return l -> finalRFunc.apply(l).build();
}

View File

@ -11,8 +11,8 @@ import io.pinecone.proto.DescribeIndexStatsRequest;
import java.util.function.LongFunction;
public class PineconeDescribeIndexStatsOpDispenser extends PineconeOpDispenser {
private DescribeIndexStatsRequest request;
private PineconeConnection connection;
private final String indexName;
private final LongFunction<DescribeIndexStatsRequest> indexStatsRequestFunc;
public PineconeDescribeIndexStatsOpDispenser(PineconeDriverAdapter adapter,
ParsedOp op,
@ -20,17 +20,17 @@ public class PineconeDescribeIndexStatsOpDispenser extends PineconeOpDispenser {
LongFunction<String> targetFunction) {
super(adapter, op, pcFunction, targetFunction);
String indexName = op.getStaticValue("describeIndexStats");
connection = pcFunction.apply(0).getConnection(indexName);
request = createDescribeIndexStatsRequest();
indexName = op.getAsRequiredFunction("indexStatsRequest", String.class).apply(0);
indexStatsRequestFunc = createDescribeIndexStatsRequestFunction(op);
}
private DescribeIndexStatsRequest createDescribeIndexStatsRequest() {
return DescribeIndexStatsRequest.newBuilder().build();
private LongFunction<DescribeIndexStatsRequest> createDescribeIndexStatsRequestFunction(ParsedOp op) {
LongFunction<DescribeIndexStatsRequest.Builder> rFunc = l -> DescribeIndexStatsRequest.newBuilder();
return l -> rFunc.apply(l).build();
}
@Override
public PineconeOp apply(long value) {
return new PineconeDescribeIndexStatsOp(connection, request);
return new PineconeDescribeIndexStatsOp(pcFunction.apply(value).getConnection(indexName), indexStatsRequestFunc.apply(value));
}
}

View File

@ -10,30 +10,49 @@ import io.pinecone.proto.FetchRequest;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.LongFunction;
public class PineconeFetchOpDispenser extends PineconeOpDispenser {
private FetchRequest request;
private PineconeConnection connection;
private final LongFunction<FetchRequest> fetchRequestFunc;
private final String indexName;
public PineconeFetchOpDispenser(PineconeDriverAdapter adapter,
ParsedOp op,
LongFunction<PineconeSpace> pcFunction,
LongFunction<String> targetFunction) {
super(adapter, op, pcFunction, targetFunction);
String indexName = op.getStaticValue("fetch");
connection = pcFunction.apply(0).getConnection(indexName);
request = createFetchRequest();
indexName = op.getAsRequiredFunction("fetch", String.class).apply(0);
fetchRequestFunc = createFetchRequestFunction(op);
}
private FetchRequest createFetchRequest() {
List<String> ids = Arrays.asList("v1","v2");
return FetchRequest.newBuilder().addAllIds(ids).setNamespace("default-namespace").build();
private LongFunction<FetchRequest> createFetchRequestFunction(ParsedOp op) {
LongFunction<FetchRequest.Builder> rFunc = l -> FetchRequest.newBuilder();
Optional<LongFunction<String>> nFunc = op.getAsOptionalFunction("namespace", String.class);
if (nFunc.isPresent()) {
LongFunction<FetchRequest.Builder> finalFunc = rFunc;
LongFunction<String> af = nFunc.get();
rFunc = l -> finalFunc.apply(l).setNamespace(af.apply(l));
}
Optional<LongFunction<String>> iFunc = op.getAsOptionalFunction("ids", String.class);
if (iFunc.isPresent()) {
LongFunction<FetchRequest.Builder> finalFunc = rFunc;
LongFunction<String> af = iFunc.get();
LongFunction<List<String>> alf = l -> {
String[] vals = af.apply(l).split(",");
return Arrays.asList(vals);
};
rFunc = l -> finalFunc.apply(l).addAllIds(alf.apply(l));
}
LongFunction<FetchRequest.Builder> finalRFunc = rFunc;
return l -> finalRFunc.apply(l).build();
}
@Override
public PineconeOp apply(long value) {
return new PineconeFetchOp(connection, request);
return new PineconeFetchOp(pcFunction.apply(value).getConnection(indexName), fetchRequestFunc.apply(value));
}
}

View File

@ -1,22 +1,44 @@
package io.nosqlbench.adapter.pinecone.opdispensers;
import com.google.common.primitives.Floats;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import io.nosqlbench.adapter.pinecone.PineconeDriverAdapter;
import io.nosqlbench.adapter.pinecone.PineconeSpace;
import io.nosqlbench.adapter.pinecone.ops.PineconeOp;
import io.nosqlbench.adapter.pinecone.ops.PineconeQueryOp;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.pinecone.PineconeConnection;
import io.pinecone.proto.QueryRequest;
import io.pinecone.proto.QueryVector;
import java.util.ArrayList;
import java.util.Optional;
import java.util.function.LongFunction;
/**
float[] rawVector = {1.0F, 2.0F, 3.0F};
QueryVector queryVector = QueryVector.newBuilder()
.addAllValues(Floats.asList(rawVector))
.setFilter(Struct.newBuilder()
.putFields("some_field", Value.newBuilder()
.setStructValue(Struct.newBuilder()
.putFields("$lt", Value.newBuilder()
.setNumberValue(3)
.build()))
.build())
.build())
.setNamespace("default-namespace")
.build();
QueryRequest queryRequest = QueryRequest.newBuilder()
.addQueries(queryVector)
.setNamespace("default-namespace")
.setTopK(2)
.setIncludeMetadata(true)
.build();
}
**/
public class PineconeQueryOpDispenser extends PineconeOpDispenser {
private QueryRequest request;
private PineconeConnection connection;
private final LongFunction<QueryRequest> queryRequestFunc;
private final String indexName;
public PineconeQueryOpDispenser(PineconeDriverAdapter adapter,
ParsedOp op,
@ -24,36 +46,72 @@ public class PineconeQueryOpDispenser extends PineconeOpDispenser {
LongFunction<String> targetFunction) {
super(adapter, op, pcFunction, targetFunction);
String indexName = op.getStaticValue("query");
connection = pcFunction.apply(0).getConnection(indexName);
request = createQueryRequest();
indexName = op.getAsRequiredFunction("query", String.class).apply(0);
queryRequestFunc = createQueryRequestFunc(op, createQueryVectorFunc(op));
}
private QueryRequest createQueryRequest() {
float[] rawVector = {1.0F, 2.0F, 3.0F};
QueryVector queryVector = QueryVector.newBuilder()
.addAllValues(Floats.asList(rawVector))
.setFilter(Struct.newBuilder()
.putFields("some_field", Value.newBuilder()
.setStructValue(Struct.newBuilder()
.putFields("$lt", Value.newBuilder()
.setNumberValue(3)
.build()))
.build())
.build())
.setNamespace("default-namespace")
.build();
private LongFunction<QueryRequest> createQueryRequestFunc(ParsedOp op, LongFunction<QueryVector> queryVectorFunc) {
LongFunction<QueryRequest.Builder> rFunc = l -> QueryRequest.newBuilder();
return QueryRequest.newBuilder()
.addQueries(queryVector)
.setNamespace("default-namespace")
.setTopK(2)
.setIncludeMetadata(true)
.build();
Optional<LongFunction<String>> nFunc = op.getAsOptionalFunction("namespace", String.class);
if (nFunc.isPresent()) {
LongFunction<QueryRequest.Builder> finalFunc = rFunc;
LongFunction<String> af = nFunc.get();
rFunc = l -> finalFunc.apply(l).setNamespace(af.apply(l));
}
Optional<LongFunction<Integer>> tFunc = op.getAsOptionalFunction("topk", Integer.class);
if (tFunc.isPresent()) {
LongFunction<QueryRequest.Builder> finalFunc = rFunc;
LongFunction<Integer> af = tFunc.get();
rFunc = l -> finalFunc.apply(l).setTopK(af.apply(l));
}
Optional<LongFunction<Boolean>> mFunc = op.getAsOptionalFunction("includemetadata", Boolean.class);
if (mFunc.isPresent()) {
LongFunction<QueryRequest.Builder> finalFunc = rFunc;
LongFunction<Boolean> af = mFunc.get();
rFunc = l -> finalFunc.apply(l).setIncludeMetadata(af.apply(l));
}
LongFunction<QueryRequest.Builder> returnFunc = rFunc;
rFunc = l -> returnFunc.apply(l).addQueries(queryVectorFunc.apply(l));
LongFunction<QueryRequest.Builder> finalRFunc = rFunc;
return l -> finalRFunc.apply(l).build();
}
private LongFunction<QueryVector> createQueryVectorFunc(ParsedOp op) {
LongFunction<QueryVector.Builder> vFunc = l -> QueryVector.newBuilder();
Optional<LongFunction<String>> qFunc = op.getAsOptionalFunction("query", String.class);
if (qFunc.isPresent()) {
LongFunction<QueryVector.Builder> finalFunc = vFunc;
LongFunction<String> af = qFunc.get();
LongFunction<ArrayList<Float>> alf = l -> {
String[] vals = af.apply(l).split(",");
ArrayList<Float> fVals = new ArrayList<Float>();
for (int i = 0; i < vals.length; i++) {
fVals.add(Float.valueOf(vals[i]));
}
return fVals;
};
vFunc = l -> finalFunc.apply(l).addAllValues(alf.apply(l));
}
Optional<LongFunction<String>> nFunc = op.getAsOptionalFunction("namespace", String.class);
if (nFunc.isPresent()) {
LongFunction<QueryVector.Builder> finalFunc = vFunc;
LongFunction<String> af = nFunc.get();
vFunc = l -> finalFunc.apply(l).setNamespace(af.apply(l));
}
//TODO: Add in optional filters
LongFunction<QueryVector.Builder> finalVFunc = vFunc;
return l -> finalVFunc.apply(l).build();
}
@Override
public PineconeOp apply(long value) {
return new PineconeQueryOp(connection, request);
return new PineconeQueryOp(pcFunction.apply(value).getConnection(indexName), queryRequestFunc.apply(value));
}
}

View File

@ -6,7 +6,13 @@ ops:
# A pinecone query op
query-example:
query: myindex
... additional fields ...
vector: use bindings to generate an array of floats
namespace: mynamespace
filter:
field:
operator:
comparator:
# A delete op
delete-example: