PineconeUpsertOpDispenser

This commit is contained in:
Mark Wolters 2023-05-18 19:04:02 +00:00 committed by Madhavan
parent 4c6607df35
commit 7eb58982e6
2 changed files with 76 additions and 26 deletions

View File

@ -71,7 +71,7 @@ public class PineconeUpdateOpDispenser extends PineconeOpDispenser {
else if (val instanceof Number) targetval = Value.newBuilder().setNumberValue((((Number) val).doubleValue())).build();
metadata_map.put(key, targetval);
};
Map<String, String> metadata_values_map = mapLongFunction.apply(l);
Map<String, Object> metadata_values_map = mapLongFunction.apply(l);
metadata_values_map.forEach(stringToValue);
return UpdateRequest.newBuilder().getSetMetadataBuilder().putAllFields(metadata_map).build();
}).orElse(null);

View File

@ -1,20 +1,28 @@
package io.nosqlbench.adapter.pinecone.opdispensers;
import com.google.protobuf.Struct;
import com.google.protobuf.Value;
import io.nosqlbench.adapter.pinecone.PineconeDriverAdapter;
import io.nosqlbench.adapter.pinecone.PineconeSpace;
import io.nosqlbench.adapter.pinecone.ops.PineconeOp;
import io.nosqlbench.adapter.pinecone.ops.PineconeUpsertOp;
import io.nosqlbench.engine.api.templating.ParsedOp;
import io.pinecone.proto.SparseValues;
import io.pinecone.proto.UpdateRequest;
import io.pinecone.proto.UpsertRequest;
import io.pinecone.proto.Vector;
import jakarta.ws.rs.NotSupportedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.*;
import java.util.function.BiConsumer;
import java.util.function.LongFunction;
public class PineconeUpsertOpDispenser extends PineconeOpDispenser {
private static final Logger LOGGER = LogManager.getLogger(PineconeUpsertOpDispenser.class);
private final LongFunction<UpsertRequest> upsertRequestFunc;
private final LongFunction<UpsertRequest.Builder> upsertRequestFunc;
private final LongFunction<Collection<Vector>> upsertVectorFunc;
/**
* Create a new PineconeUpsertOpDispenser subclassed from {@link PineconeOpDispenser}.
@ -30,35 +38,77 @@ public class PineconeUpsertOpDispenser extends PineconeOpDispenser {
LongFunction<String> targetFunction) {
super(adapter, op, pcFunction, targetFunction);
upsertRequestFunc = createUpsertRequestFunc(op);
upsertVectorFunc = createUpsertRequestVectorsFunc(op);
}
/*
*float[][] upsertData = {{1.0F, 2.0F, 3.0F}, {4.0F, 5.0F, 6.0F}, {7.0F, 8.0F, 9.0F}};
*List<String> upsertIds = Arrays.asList("v1", "v2", "v3");
*List<Vector> upsertVectors = new ArrayList<>();
*
*for (int i = 0; i < upsertData.length; i++) {
*upsertVectors.add(Vector.newBuilder()
*.addAllValues(Floats.asList(upsertData[i]))
*.setMetadata(Struct.newBuilder()
*.putFields("some_field", Value.newBuilder().setNumberValue(i).build())
*.build())
*.setId(upsertIds.get(i))
*.build());
* }
*
*return UpsertRequest.newBuilder()
*.addAllVectors(upsertVectors)
*.setNamespace("default-namespace")
*.build();
*/
private LongFunction<UpsertRequest> createUpsertRequestFunc(ParsedOp op) {
throw new NotSupportedException("Pinecone Upsert Request Op not yet supported");
private LongFunction<Collection<Vector>> createUpsertRequestVectorsFunc(ParsedOp op) {
Optional<LongFunction<List>> baseFunc =
op.getAsOptionalFunction("upsert_vectors", List.class);
return baseFunc.<LongFunction<Collection<Vector>>>map(listLongFunction -> l -> {
List<Vector> returnVectors = new ArrayList<>();
List<Map<String, Object>> vectors = listLongFunction.apply(l);
for (Map<String, Object> vector : vectors) {
Vector.Builder vb = Vector.newBuilder();
String[] rawValues = ((String) vector.get("values")).split(",");
ArrayList<Float> floatValues = new ArrayList<>();
for (String val : rawValues) {
floatValues.add(Float.valueOf(val));
}
vb.addAllValues(floatValues);
if (vector.containsKey("sparse_values")) {
Map<String,String> sparse_values = (Map<String, String>) vector.get("sparse_values");
rawValues = ((String) sparse_values.get("values")).split(",");
floatValues = new ArrayList<>();
for (String val : rawValues) {
floatValues.add(Float.valueOf(val));
}
rawValues = sparse_values.get("indices").split(",");
List<Integer> intValues = new ArrayList<>();
for (String val : rawValues) {
intValues.add(Integer.valueOf(val));
}
vb.setSparseValues(SparseValues.newBuilder()
.addAllValues(floatValues)
.addAllIndices(intValues)
.build());
}
if (vector.containsKey("metadata")) {
Map<String, Value> metadata_map = new HashMap<String, Value>();
BiConsumer<String,Object> stringToValue = (key, val) -> {
Value targetval = null;
if (val instanceof String) targetval = Value.newBuilder().setStringValue((String)val).build();
else if (val instanceof Number) targetval = Value.newBuilder().setNumberValue((((Number) val).doubleValue())).build();
metadata_map.put(key, targetval);
};
Map<String, Object> metadata_values_map = (Map<String, Object>) vector.get("metadata");
metadata_values_map.forEach(stringToValue);
vb.setMetadata(Struct.newBuilder().putAllFields(metadata_map).build());
}
returnVectors.add(vb.build());
}
return returnVectors;
}).orElse(null);
}
private LongFunction<UpsertRequest.Builder> createUpsertRequestFunc(ParsedOp op) {
LongFunction<UpsertRequest.Builder> rFunc = l -> UpsertRequest.newBuilder();
Optional<LongFunction<String>> nFunc = op.getAsOptionalFunction("namespace", String.class);
if (nFunc.isPresent()) {
LongFunction<UpsertRequest.Builder> finalFunc = rFunc;
LongFunction<String> af = nFunc.get();
rFunc = l -> finalFunc.apply(l).setNamespace(af.apply(l));
}
return rFunc;
}
@Override
public PineconeOp apply(long value) {
return new PineconeUpsertOp(pcFunction.apply(value).getConnection(targetFunction.apply(value)),
upsertRequestFunc.apply(value));
UpsertRequest.Builder urb = upsertRequestFunc.apply(value);
if (upsertVectorFunc != null) {
urb.addAllVectors(upsertVectorFunc.apply(value));
}
return new PineconeUpsertOp(pcFunction.apply(value).getConnection(targetFunction.apply(value)), urb.build());
}
}