From 7eb58982e61b5c91e1cc4d8bea372c82618bc96c Mon Sep 17 00:00:00 2001 From: Mark Wolters Date: Thu, 18 May 2023 19:04:02 +0000 Subject: [PATCH] PineconeUpsertOpDispenser --- .../PineconeUpdateOpDispenser.java | 2 +- .../PineconeUpsertOpDispenser.java | 100 +++++++++++++----- 2 files changed, 76 insertions(+), 26 deletions(-) diff --git a/adapter-pinecone/src/main/java/io/nosqlbench/adapter/pinecone/opdispensers/PineconeUpdateOpDispenser.java b/adapter-pinecone/src/main/java/io/nosqlbench/adapter/pinecone/opdispensers/PineconeUpdateOpDispenser.java index debae1a1d..436628d5f 100644 --- a/adapter-pinecone/src/main/java/io/nosqlbench/adapter/pinecone/opdispensers/PineconeUpdateOpDispenser.java +++ b/adapter-pinecone/src/main/java/io/nosqlbench/adapter/pinecone/opdispensers/PineconeUpdateOpDispenser.java @@ -71,7 +71,7 @@ public class PineconeUpdateOpDispenser extends PineconeOpDispenser { else if (val instanceof Number) targetval = Value.newBuilder().setNumberValue((((Number) val).doubleValue())).build(); metadata_map.put(key, targetval); }; - Map metadata_values_map = mapLongFunction.apply(l); + Map metadata_values_map = mapLongFunction.apply(l); metadata_values_map.forEach(stringToValue); return UpdateRequest.newBuilder().getSetMetadataBuilder().putAllFields(metadata_map).build(); }).orElse(null); diff --git a/adapter-pinecone/src/main/java/io/nosqlbench/adapter/pinecone/opdispensers/PineconeUpsertOpDispenser.java b/adapter-pinecone/src/main/java/io/nosqlbench/adapter/pinecone/opdispensers/PineconeUpsertOpDispenser.java index 88d684dfd..d4adcaf34 100644 --- a/adapter-pinecone/src/main/java/io/nosqlbench/adapter/pinecone/opdispensers/PineconeUpsertOpDispenser.java +++ b/adapter-pinecone/src/main/java/io/nosqlbench/adapter/pinecone/opdispensers/PineconeUpsertOpDispenser.java @@ -1,20 +1,28 @@ package io.nosqlbench.adapter.pinecone.opdispensers; +import com.google.protobuf.Struct; +import com.google.protobuf.Value; import io.nosqlbench.adapter.pinecone.PineconeDriverAdapter; import io.nosqlbench.adapter.pinecone.PineconeSpace; import io.nosqlbench.adapter.pinecone.ops.PineconeOp; import io.nosqlbench.adapter.pinecone.ops.PineconeUpsertOp; import io.nosqlbench.engine.api.templating.ParsedOp; +import io.pinecone.proto.SparseValues; +import io.pinecone.proto.UpdateRequest; import io.pinecone.proto.UpsertRequest; +import io.pinecone.proto.Vector; import jakarta.ws.rs.NotSupportedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.*; +import java.util.function.BiConsumer; import java.util.function.LongFunction; public class PineconeUpsertOpDispenser extends PineconeOpDispenser { private static final Logger LOGGER = LogManager.getLogger(PineconeUpsertOpDispenser.class); - private final LongFunction upsertRequestFunc; + private final LongFunction upsertRequestFunc; + private final LongFunction> upsertVectorFunc; /** * Create a new PineconeUpsertOpDispenser subclassed from {@link PineconeOpDispenser}. @@ -30,35 +38,77 @@ public class PineconeUpsertOpDispenser extends PineconeOpDispenser { LongFunction targetFunction) { super(adapter, op, pcFunction, targetFunction); upsertRequestFunc = createUpsertRequestFunc(op); + upsertVectorFunc = createUpsertRequestVectorsFunc(op); } - /* - *float[][] upsertData = {{1.0F, 2.0F, 3.0F}, {4.0F, 5.0F, 6.0F}, {7.0F, 8.0F, 9.0F}}; - *List upsertIds = Arrays.asList("v1", "v2", "v3"); - *List upsertVectors = new ArrayList<>(); - * - *for (int i = 0; i < upsertData.length; i++) { - *upsertVectors.add(Vector.newBuilder() - *.addAllValues(Floats.asList(upsertData[i])) - *.setMetadata(Struct.newBuilder() - *.putFields("some_field", Value.newBuilder().setNumberValue(i).build()) - *.build()) - *.setId(upsertIds.get(i)) - *.build()); - * } - * - *return UpsertRequest.newBuilder() - *.addAllVectors(upsertVectors) - *.setNamespace("default-namespace") - *.build(); - */ - private LongFunction createUpsertRequestFunc(ParsedOp op) { - throw new NotSupportedException("Pinecone Upsert Request Op not yet supported"); + private LongFunction> createUpsertRequestVectorsFunc(ParsedOp op) { + Optional> baseFunc = + op.getAsOptionalFunction("upsert_vectors", List.class); + return baseFunc.>>map(listLongFunction -> l -> { + List returnVectors = new ArrayList<>(); + List> vectors = listLongFunction.apply(l); + for (Map vector : vectors) { + Vector.Builder vb = Vector.newBuilder(); + String[] rawValues = ((String) vector.get("values")).split(","); + ArrayList floatValues = new ArrayList<>(); + for (String val : rawValues) { + floatValues.add(Float.valueOf(val)); + } + vb.addAllValues(floatValues); + if (vector.containsKey("sparse_values")) { + Map sparse_values = (Map) vector.get("sparse_values"); + rawValues = ((String) sparse_values.get("values")).split(","); + floatValues = new ArrayList<>(); + for (String val : rawValues) { + floatValues.add(Float.valueOf(val)); + } + rawValues = sparse_values.get("indices").split(","); + List intValues = new ArrayList<>(); + for (String val : rawValues) { + intValues.add(Integer.valueOf(val)); + } + vb.setSparseValues(SparseValues.newBuilder() + .addAllValues(floatValues) + .addAllIndices(intValues) + .build()); + } + if (vector.containsKey("metadata")) { + Map metadata_map = new HashMap(); + BiConsumer stringToValue = (key, val) -> { + Value targetval = null; + if (val instanceof String) targetval = Value.newBuilder().setStringValue((String)val).build(); + else if (val instanceof Number) targetval = Value.newBuilder().setNumberValue((((Number) val).doubleValue())).build(); + metadata_map.put(key, targetval); + }; + Map metadata_values_map = (Map) vector.get("metadata"); + metadata_values_map.forEach(stringToValue); + vb.setMetadata(Struct.newBuilder().putAllFields(metadata_map).build()); + } + returnVectors.add(vb.build()); + } + return returnVectors; + }).orElse(null); + } + + private LongFunction createUpsertRequestFunc(ParsedOp op) { + LongFunction rFunc = l -> UpsertRequest.newBuilder(); + + Optional> nFunc = op.getAsOptionalFunction("namespace", String.class); + if (nFunc.isPresent()) { + LongFunction finalFunc = rFunc; + LongFunction af = nFunc.get(); + rFunc = l -> finalFunc.apply(l).setNamespace(af.apply(l)); + } + + return rFunc; } @Override public PineconeOp apply(long value) { - return new PineconeUpsertOp(pcFunction.apply(value).getConnection(targetFunction.apply(value)), - upsertRequestFunc.apply(value)); + UpsertRequest.Builder urb = upsertRequestFunc.apply(value); + if (upsertVectorFunc != null) { + urb.addAllVectors(upsertVectorFunc.apply(value)); + } + return new PineconeUpsertOp(pcFunction.apply(value).getConnection(targetFunction.apply(value)), urb.build()); } }