mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2024-11-29 12:04:04 -06:00
fleshed out PineconeQueryOpDispenser
This commit is contained in:
parent
7801959822
commit
06521cd98d
@ -9,12 +9,11 @@ import io.nosqlbench.adapter.pinecone.ops.PineconeQueryOp;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.pinecone.proto.QueryRequest;
|
||||
import io.pinecone.proto.QueryVector;
|
||||
import io.pinecone.proto.SparseValues;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class PineconeQueryOpDispenser extends PineconeOpDispenser {
|
||||
@ -93,33 +92,69 @@ public class PineconeQueryOpDispenser extends PineconeOpDispenser {
|
||||
String[] filterFields = filterFunction.get().apply(l).split(" ");
|
||||
return Struct.newBuilder().putFields(filterFields[0],
|
||||
Value.newBuilder().setStructValue(Struct.newBuilder().putFields(filterFields[1],
|
||||
Value.newBuilder().setNumberValue(Integer.valueOf(filterFields[2])).build()))
|
||||
Value.newBuilder().setNumberValue(Integer.parseInt(filterFields[2])).build()))
|
||||
.build()).build();
|
||||
};
|
||||
rFunc = l -> finalFunc.apply(l).setFilter(builtFilter.apply(l));
|
||||
}
|
||||
|
||||
LongFunction<QueryRequest.Builder> finalRFunc = rFunc;
|
||||
return l -> finalRFunc.apply(l);
|
||||
return rFunc;
|
||||
}
|
||||
|
||||
private LongFunction<Collection<QueryVector>> createQueryVectorFunc(ParsedOp op) {
|
||||
//Optional<LongFunction<Collection<Map<String,String>>>> baseFunc = op.getAsOptionalFunction("query_vectors", String.class);
|
||||
|
||||
// LongFunction<QueryVector.Builder> vFunc = l -> QueryVector.newBuilder();
|
||||
// LongFunction<QueryVector.Builder> finalVFunc = vFunc;
|
||||
//return l -> finalVFunc.apply(l).build();
|
||||
return l -> null;
|
||||
Optional<LongFunction<List>> baseFunc =
|
||||
op.getAsOptionalFunction("query_vectors", List.class);
|
||||
return baseFunc.<LongFunction<Collection<QueryVector>>>map(listLongFunction -> l -> {
|
||||
List<QueryVector> returnVectors = new ArrayList<>();
|
||||
List<Map<String, Object>> vectors = listLongFunction.apply(l);
|
||||
for (Map<String, Object> vector : vectors) {
|
||||
QueryVector.Builder qvb = QueryVector.newBuilder();
|
||||
String[] rawValues = ((String) vector.get("values")).split(",");
|
||||
ArrayList<Float> floatValues = new ArrayList<>();
|
||||
for (String val : rawValues) {
|
||||
floatValues.add(Float.valueOf(val));
|
||||
}
|
||||
qvb.addAllValues(floatValues);
|
||||
qvb.setNamespace((String) vector.get("namespace"));
|
||||
if (vector.containsKey("top_k")) {
|
||||
qvb.setTopK((Integer) vector.get("top_k"));
|
||||
}
|
||||
if (vector.containsKey("filter")) {
|
||||
String[] rawVals = ((String)vector.get("filter")).split(" ");
|
||||
qvb.setFilter(Struct.newBuilder().putFields(rawVals[0],
|
||||
Value.newBuilder().setStructValue(Struct.newBuilder().putFields(rawVals[1],
|
||||
Value.newBuilder().setNumberValue(Integer.parseInt(rawVals[2])).build()))
|
||||
.build()).build());
|
||||
}
|
||||
if (vector.containsKey("sparse_values")) {
|
||||
Map<String,String> sparse_values = (Map<String, String>) vector.get("sparse_values");
|
||||
rawValues = ((String) sparse_values.get("values")).split(",");
|
||||
floatValues = new ArrayList<>();
|
||||
for (String val : rawValues) {
|
||||
floatValues.add(Float.valueOf(val));
|
||||
}
|
||||
rawValues = sparse_values.get("indices").split(",");
|
||||
List<Integer> intValues = new ArrayList<>();
|
||||
for (String val : rawValues) {
|
||||
intValues.add(Integer.valueOf(val));
|
||||
}
|
||||
qvb.setSparseValues(SparseValues.newBuilder()
|
||||
.addAllValues(floatValues)
|
||||
.addAllIndices(intValues)
|
||||
.build());
|
||||
}
|
||||
returnVectors.add(qvb.build());
|
||||
}
|
||||
return returnVectors;
|
||||
}).orElse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public PineconeOp apply(long value) {
|
||||
QueryRequest.Builder qrb = queryRequestFunc.apply(value);
|
||||
Collection<QueryVector> vectors = queryVectorFunc.apply(value);
|
||||
if (vectors != null) {
|
||||
qrb.addAllQueries(vectors);
|
||||
if (queryVectorFunc != null) {
|
||||
qrb.addAllQueries(queryVectorFunc.apply(value));
|
||||
}
|
||||
|
||||
return new PineconeQueryOp(pcFunction.apply(value).getConnection(targetFunction.apply(value)), qrb.build());
|
||||
}
|
||||
}
|
||||
|
@ -1,20 +1,25 @@
|
||||
package io.nosqlbench.adapter.pinecone.opdispensers;
|
||||
|
||||
import com.google.protobuf.Struct;
|
||||
import io.nosqlbench.adapter.pinecone.PineconeDriverAdapter;
|
||||
import io.nosqlbench.adapter.pinecone.PineconeSpace;
|
||||
import io.nosqlbench.adapter.pinecone.ops.PineconeOp;
|
||||
import io.nosqlbench.adapter.pinecone.ops.PineconeUpdateOp;
|
||||
import io.nosqlbench.engine.api.templating.ParsedOp;
|
||||
import io.pinecone.proto.SparseValues;
|
||||
import io.pinecone.proto.UpdateRequest;
|
||||
import jakarta.ws.rs.NotSupportedException;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Optional;
|
||||
import java.util.function.LongFunction;
|
||||
|
||||
public class PineconeUpdateOpDispenser extends PineconeOpDispenser {
|
||||
private static final Logger LOGGER = LogManager.getLogger(PineconeUpdateOpDispenser.class);
|
||||
private final LongFunction<UpdateRequest> updateRequestFunc;
|
||||
private final LongFunction<UpdateRequest.Builder> updateRequestFunc;
|
||||
private final LongFunction<Struct> updateMetadataFunc;
|
||||
private final LongFunction<SparseValues> sparseValuesFunc;
|
||||
|
||||
/**
|
||||
* Create a new PineconeUpdateOpDispenser subclassed from {@link PineconeOpDispenser}.
|
||||
@ -30,15 +35,79 @@ public class PineconeUpdateOpDispenser extends PineconeOpDispenser {
|
||||
LongFunction<String> targetFunction) {
|
||||
super(adapter, op, pcFunction, targetFunction);
|
||||
updateRequestFunc = createUpdateRequestFunction(op);
|
||||
updateMetadataFunc = createUpdateMetadataFunction(op);
|
||||
sparseValuesFunc = createSparseValuesFunction(op);
|
||||
}
|
||||
|
||||
private LongFunction<UpdateRequest> createUpdateRequestFunction(ParsedOp op) {
|
||||
throw new NotSupportedException("Pinecone Update Request Op not yet supported");
|
||||
private LongFunction<SparseValues> createSparseValuesFunction(ParsedOp op) {
|
||||
return null;
|
||||
}
|
||||
|
||||
private LongFunction<Struct> createUpdateMetadataFunction(ParsedOp op) {
|
||||
//new Struct.newBuilder(
|
||||
// UpdateRequest.newBuilder().getSetMetadataBuilder().putAllFields(Map<String,Value>)))
|
||||
return null;
|
||||
}
|
||||
|
||||
/*
|
||||
update-example:
|
||||
type: update
|
||||
index: update_index
|
||||
id: string_id
|
||||
values: list_of_floats
|
||||
namespace: update_namespace
|
||||
metadata:
|
||||
- key1: val1
|
||||
- key2: val2
|
||||
- key3: val3
|
||||
sparse_values:
|
||||
indices: list_of_ints
|
||||
values: list_of_floats
|
||||
*/
|
||||
private LongFunction<UpdateRequest.Builder> createUpdateRequestFunction(ParsedOp op) {
|
||||
LongFunction<UpdateRequest.Builder> rFunc = l -> UpdateRequest.newBuilder();
|
||||
|
||||
Optional<LongFunction<String>> nFunc = op.getAsOptionalFunction("namespace", String.class);
|
||||
if (nFunc.isPresent()) {
|
||||
LongFunction<UpdateRequest.Builder> finalFunc = rFunc;
|
||||
LongFunction<String> af = nFunc.get();
|
||||
rFunc = l -> finalFunc.apply(l).setNamespace(af.apply(l));
|
||||
}
|
||||
|
||||
Optional<LongFunction<String>> iFunc = op.getAsOptionalFunction("id", String.class);
|
||||
if (iFunc.isPresent()) {
|
||||
LongFunction<UpdateRequest.Builder> finalFunc = rFunc;
|
||||
LongFunction<String> af = iFunc.get();
|
||||
rFunc = l -> finalFunc.apply(l).setId(af.apply(l));
|
||||
}
|
||||
|
||||
Optional<LongFunction<String>> vFunc = op.getAsOptionalFunction("values", String.class);
|
||||
if (vFunc.isPresent()) {
|
||||
LongFunction<UpdateRequest.Builder> finalFunc = rFunc;
|
||||
LongFunction<String> af = vFunc.get();
|
||||
LongFunction<ArrayList<Float>> alf = l -> {
|
||||
String[] vals = af.apply(l).split(",");
|
||||
ArrayList<Float> fVals = new ArrayList<>();
|
||||
for (String val : vals) {
|
||||
fVals.add(Float.valueOf(val));
|
||||
}
|
||||
return fVals;
|
||||
};
|
||||
rFunc = l -> finalFunc.apply(l).addAllValues(alf.apply(l));
|
||||
}
|
||||
|
||||
return rFunc;
|
||||
}
|
||||
|
||||
@Override
|
||||
public PineconeOp apply(long value) {
|
||||
return new PineconeUpdateOp(pcFunction.apply(value).getConnection(targetFunction.apply(value)),
|
||||
updateRequestFunc.apply(value));
|
||||
UpdateRequest.Builder urb = updateRequestFunc.apply(value);
|
||||
if (updateMetadataFunc != null) {
|
||||
urb.setSetMetadata(updateMetadataFunc.apply(value));
|
||||
}
|
||||
if (sparseValuesFunc != null) {
|
||||
urb.setSparseValues(sparseValuesFunc.apply(value));
|
||||
}
|
||||
return new PineconeUpdateOp(pcFunction.apply(value).getConnection(targetFunction.apply(value)), urb.build());
|
||||
}
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ ops:
|
||||
id: string_id
|
||||
values: list_of_floats
|
||||
namespace: update_namespace
|
||||
set_metadata:
|
||||
metadata:
|
||||
- key1: val1
|
||||
- key2: val2
|
||||
- key3: val3
|
||||
|
@ -1,10 +1,7 @@
|
||||
package io.nosqlbench.adapter.pinecone;
|
||||
|
||||
import io.nosqlbench.adapter.pinecone.opdispensers.PineconeDeleteOpDispenser;
|
||||
import io.nosqlbench.adapter.pinecone.opdispensers.PineconeQueryOpDispenser;
|
||||
import io.nosqlbench.adapter.pinecone.ops.PineconeDeleteOp;
|
||||
import io.nosqlbench.adapter.pinecone.ops.PineconeOp;
|
||||
import io.nosqlbench.adapter.pinecone.ops.PineconeQueryOp;
|
||||
import io.nosqlbench.adapter.pinecone.opdispensers.*;
|
||||
import io.nosqlbench.adapter.pinecone.ops.*;
|
||||
import io.nosqlbench.api.config.NBLabeledElement;
|
||||
import io.nosqlbench.api.config.standard.NBConfiguration;
|
||||
import io.nosqlbench.engine.api.activityconfig.OpsLoader;
|
||||
@ -102,29 +99,89 @@ public class PineconeOpMapperTest {
|
||||
|
||||
@Test
|
||||
public void testDescribeIndexStatsOpDispenser() {
|
||||
ParsedOp pop = parsedOpFor("""
|
||||
ops:
|
||||
op1:
|
||||
type: "describeindexstats"
|
||||
index: "test-index"
|
||||
filter: "value $gt 10"
|
||||
""");
|
||||
OpDispenser<? extends PineconeOp> dispenser = mapper.apply(pop);
|
||||
assert(dispenser instanceof PineconeDescribeIndexStatsOpDispenser);
|
||||
PineconeOp op = dispenser.apply(0);
|
||||
assert(op instanceof PineconeDescribeIndexStatsOp);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFetchOpDispenser() {
|
||||
ParsedOp pop = parsedOpFor("""
|
||||
ops:
|
||||
op1:
|
||||
type: "fetch"
|
||||
index: "test-index"
|
||||
ids: "1.0,2.0,3.0"
|
||||
namespace: "test-namespace"
|
||||
""");
|
||||
OpDispenser<? extends PineconeOp> dispenser = mapper.apply(pop);
|
||||
assert(dispenser instanceof PineconeFetchOpDispenser);
|
||||
PineconeOp op = dispenser.apply(0);
|
||||
assert(op instanceof PineconeFetchOp);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpdateOpDispenser() {
|
||||
ParsedOp pop = parsedOpFor("""
|
||||
ops:
|
||||
op1:
|
||||
type: "update"
|
||||
index: "test-index"
|
||||
id: "id"
|
||||
values: "1.0,2.0,3.0"
|
||||
namespace: "test_namespace"
|
||||
metadata:
|
||||
- key1: val1
|
||||
- key2: val2
|
||||
- key3: val3
|
||||
sparse_values:
|
||||
indices: list_of_ints
|
||||
values: list_of_floats
|
||||
""");
|
||||
OpDispenser<? extends PineconeOp> dispenser = mapper.apply(pop);
|
||||
assert(dispenser instanceof PineconeUpdateOpDispenser);
|
||||
PineconeOp op = dispenser.apply(0);
|
||||
assert(op instanceof PineconeUpdateOp);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUpsertOpDispenser() {
|
||||
ParsedOp pop = parsedOpFor("""
|
||||
ops:
|
||||
op1:
|
||||
type: "upsert"
|
||||
index: "test-index"
|
||||
upsert_vectors:
|
||||
- id: 1
|
||||
values: csv_separated_floats
|
||||
sparse_values:
|
||||
indices: list_of_ints
|
||||
values: list_of_floats
|
||||
metadata:
|
||||
- key1: val1
|
||||
- key2: val2
|
||||
- id: 2
|
||||
values: csv_separated_floats
|
||||
sparse_values:
|
||||
indices: list_of_ints
|
||||
values: list_of_floats
|
||||
""");
|
||||
OpDispenser<? extends PineconeOp> dispenser = mapper.apply(pop);
|
||||
assert(dispenser instanceof PineconeUpsertOpDispenser);
|
||||
PineconeOp op = dispenser.apply(0);
|
||||
assert(op instanceof PineconeUpsertOp);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueryOpDispenserComplex() {
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user