diff --git a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/GCPSpannerOpMapper.java b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/GCPSpannerOpMapper.java index f9ba74db6..c2071da3b 100644 --- a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/GCPSpannerOpMapper.java +++ b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/GCPSpannerOpMapper.java @@ -55,15 +55,10 @@ public class GCPSpannerOpMapper implements OpMapper> { logger.info(() -> "Using '" + typeAndTarget.enumId + "' op type for op template '" + op.getName() + "'"); return switch (typeAndTarget.enumId) { -// case delete_index -> new GCPSpannerDeleteIndexOpDispenser(adapter, op, typeAndTarget.targetFunction); case create_table -> new GCPSpannerCreateTableOpDispenser(adapter, op, typeAndTarget.targetFunction); -// case list_indexes -> new GCPSpannerListIndexesOpDispenser(adapter, op, typeAndTarget.targetFunction); -// case upload_documents -> new GCPSpannerUploadDocumentsOpDispenser(adapter, op, typeAndTarget.targetFunction); -// case search_documents -> new GCPSpannerSearchDocumentsOpDispenser(adapter, op, typeAndTarget.targetFunction); - -// default -> throw new RuntimeException( -// "Unrecognized op type '" + typeAndTarget.enumId.name() + "' while " + "mapping parsed op " + op); + case insert_vector -> + new GCPSpannerInsertVectorOpDispenser(adapter, op, typeAndTarget.targetFunction); }; } } diff --git a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/GCPSpannerSpace.java b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/GCPSpannerSpace.java index 2950c42d4..abdb6a3e4 100644 --- a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/GCPSpannerSpace.java +++ b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/GCPSpannerSpace.java @@ -73,6 +73,18 @@ public class GCPSpannerSpace implements AutoCloseable { return dbClient; } + public DatabaseId getDatabaseId() { + return DatabaseId.of(cfg.get("project_id"), cfg.get("instance_id"), cfg.get("database_id")); + } + + public String getInstanceId() { + return cfg.get("instance_id"); + } + + public String getDatabaseIdString() { + return cfg.get("database_id"); + } + private Spanner createSpanner() { if (/*cfg.getOptional("service_account_file").isEmpty() ||*/ cfg.getOptional("database_id").isEmpty() || @@ -86,32 +98,7 @@ public class GCPSpannerSpace implements AutoCloseable { var spannerClient = SpannerOptions.newBuilder().setProjectId(projectId).build().getService(); dbAdminClient = spannerClient.getDatabaseAdminClient(); dbClient = spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId)); -// var requiredToken = cfg.getOptional("token_file").map(Paths::get).map(tokenFilePath -> { -// try { -// return Files.readAllLines(tokenFilePath).getFirst(); -// } catch (IOException e) { -// String error = "Error while reading token from file:" + tokenFilePath; -// logger.error(error, e); -// throw new RuntimeException(e); -// } -// }).orElseGet(() -> cfg.getOptional("token").orElseThrow(() -> new RuntimeException( -// "You must provide either a 'token_file' or a 'token' to configure a Azure AI Search client"))); -// logger.info(() -> "Creating new Azure AI Search Client with (masked) token/key [" -// + GCPSpannerAdapterUtils.maskDigits(requiredToken) + "], uri/endpoint [" + uri + "]"); -// -// var spannerBuilder = SpannerOptions().endpoint(uri); -// if (!requiredToken.isBlank()) { -// SpannerBuilder = SpannerBuilder.credential(new AzureKeyCredential(requiredToken)); -// } else { -// TokenCredential tokenCredential = new DefaultAzureCredentialBuilder().build(); -// SpannerBuilder = SpannerBuilder.credential(tokenCredential); -// } -// // Should we leave these below to leverage the SearchServiceVersion.getLatest()? -// String apiVersion = cfg.getOptional("api_version").orElse(SearchServiceVersion.V2024_07_01.name()); -// logger.warn( -// () -> "Latest search service version supported by this client is '" + SearchServiceVersion.getLatest() -// + "', but we're using '" + apiVersion + "' version. Ignore this warning if both are same."); return spannerClient; } @@ -127,6 +114,9 @@ public class GCPSpannerSpace implements AutoCloseable { } @Override public void close() throws Exception { + if (spanner != null) { + spanner.close(); + } spanner = null; } } diff --git a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerBaseOpDispenser.java b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerBaseOpDispenser.java index 70ce58d63..0f9c37a59 100644 --- a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerBaseOpDispenser.java +++ b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerBaseOpDispenser.java @@ -23,14 +23,17 @@ import io.nosqlbench.adapter.gcpspanner.GCPSpannerSpace; import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.adapters.api.templating.ParsedOp; -public class GCPSpannerBaseOpDispenser extends BaseOpDispenser, GCPSpannerSpace> { +import java.util.function.LongFunction; - protected GCPSpannerBaseOpDispenser(DriverAdapter, ? extends GCPSpannerSpace> adapter, ParsedOp op) { +public abstract class GCPSpannerBaseOpDispenser extends BaseOpDispenser, GCPSpannerSpace> { + protected final LongFunction targetFunction; + protected final LongFunction spaceFunction; + + protected GCPSpannerBaseOpDispenser(DriverAdapter, GCPSpannerSpace> adapter, ParsedOp op, + LongFunction targetFunction) { super(adapter, op); + this.targetFunction = targetFunction; + this.spaceFunction = adapter.getSpaceFunc(op); } - @Override - public GCPSpannerBaseOp getOp(long value) { - return null; - } } diff --git a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerCreateTableOpDispenser.java b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerCreateTableOpDispenser.java index 1febef2f7..56cdc4692 100644 --- a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerCreateTableOpDispenser.java +++ b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerCreateTableOpDispenser.java @@ -17,13 +17,40 @@ package io.nosqlbench.adapter.gcpspanner.opdispensers; +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseAdminClient; import io.nosqlbench.adapter.gcpspanner.GCPSpannerDriverAdapter; +import io.nosqlbench.adapter.gcpspanner.GCPSpannerSpace; +import io.nosqlbench.adapter.gcpspanner.ops.GCPSpannerBaseOp; +import io.nosqlbench.adapter.gcpspanner.ops.GCPSpannerCreateTableOp; import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.function.LongFunction; public class GCPSpannerCreateTableOpDispenser extends GCPSpannerBaseOpDispenser { + private static final Logger logger = LogManager.getLogger(GCPSpannerCreateTableOpDispenser.class); + private final LongFunction opFunction; + public GCPSpannerCreateTableOpDispenser(GCPSpannerDriverAdapter adapter, ParsedOp op, LongFunction targetFunction) { - super(adapter, op); + super(adapter, op, targetFunction); + this.opFunction = createOpFunction(op); + } + + private LongFunction createOpFunction(ParsedOp op) { + + return (l) -> new GCPSpannerCreateTableOp( + spaceFunction.apply(l).getSpanner(), + l, + op.getAsRequiredFunction("DDL", String.class).apply(l), + spaceFunction.apply(l).getDbAdminClient(), + spaceFunction.apply(l).getDbAdminClient().getDatabase(spaceFunction.apply(l).getInstanceId(), spaceFunction.apply(l).getDatabaseIdString()) + ); + } + + @Override + public GCPSpannerBaseOp getOp(long value) { + return opFunction.apply(value); } } diff --git a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerInsertVectorOpDispenser.java b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerInsertVectorOpDispenser.java new file mode 100644 index 000000000..aaa449991 --- /dev/null +++ b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/opdispensers/GCPSpannerInsertVectorOpDispenser.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.nosqlbench.adapter.gcpspanner.opdispensers; + +import com.google.cloud.spanner.Mutation; +import io.nosqlbench.adapter.gcpspanner.GCPSpannerDriverAdapter; +import io.nosqlbench.adapter.gcpspanner.ops.GCPSpannerBaseOp; +import io.nosqlbench.adapter.gcpspanner.ops.GCPSpannerInsertVectorOp; +import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.function.LongFunction; + +public class GCPSpannerInsertVectorOpDispenser extends GCPSpannerBaseOpDispenser { + private static final Logger logger = LogManager.getLogger(GCPSpannerInsertVectorOpDispenser.class); + private final LongFunction opFunction; + + public GCPSpannerInsertVectorOpDispenser(GCPSpannerDriverAdapter adapter, ParsedOp op, LongFunction targetFunction) { + super(adapter, op, targetFunction); + this.opFunction = createOpFunction(op); + } + + private LongFunction createOpFunction(ParsedOp op) { + LongFunction vectorF= op.getAsRequiredFunction("vector", float[].class); + + return (l) -> new GCPSpannerInsertVectorOp( + spaceFunction.apply(l).getSpanner(), + l, + Mutation.newInsertBuilder(op.getStaticValue("table", java.lang.String.class)) + .set(op.getStaticValue("pkey", java.lang.String.class)).to(l) + .set("VectorData").toFloat32Array(vectorF.apply(l)) + .build(), + spaceFunction.apply(l).getDbClient() + ); + } + + @Override + public GCPSpannerBaseOp getOp(long value) { + return opFunction.apply(value); + } +} diff --git a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/ops/GCPSpannerCreateTableOp.java b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/ops/GCPSpannerCreateTableOp.java new file mode 100644 index 000000000..166bd754b --- /dev/null +++ b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/ops/GCPSpannerCreateTableOp.java @@ -0,0 +1,50 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.nosqlbench.adapter.gcpspanner.ops; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.spanner.*; +import com.google.common.collect.ImmutableList; +import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; + +public class GCPSpannerCreateTableOp extends GCPSpannerBaseOp { + private final String createTableStatement; + private final DatabaseAdminClient dbAdminClient; + private final Database db; + + public GCPSpannerCreateTableOp(Spanner searchIndexClient, Long requestParam, String createTableStatement, + DatabaseAdminClient dbAdminClient, Database db) { + super(searchIndexClient, requestParam); + this.createTableStatement = createTableStatement; + this.dbAdminClient = dbAdminClient; + this.db = db; + } + + @Override + public Object applyOp(long value) { + OperationFuture operation = dbAdminClient.updateDatabaseDdl( + db, + ImmutableList.of(createTableStatement), + null); + try { + return operation.get(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/ops/GCPSpannerInsertVectorOp.java b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/ops/GCPSpannerInsertVectorOp.java new file mode 100644 index 000000000..666035b10 --- /dev/null +++ b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/ops/GCPSpannerInsertVectorOp.java @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2024 nosqlbench + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package io.nosqlbench.adapter.gcpspanner.ops; + +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.Mutation; + +import java.util.Arrays; +import java.util.Collections; + +public class GCPSpannerInsertVectorOp extends GCPSpannerBaseOp { + private final Mutation mutation; + private final DatabaseClient dbClient; + + public GCPSpannerInsertVectorOp(Spanner searchIndexClient, Long requestParam, Mutation mutation, DatabaseClient dbClient) { + super(searchIndexClient, requestParam); + this.mutation = mutation; + this.dbClient = dbClient; + } + + @Override + public Object applyOp(long value) { + return dbClient.write(Collections.singletonList(mutation)); + } +} diff --git a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/types/GCPSpannerOpType.java b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/types/GCPSpannerOpType.java index 32797ba7a..e5ee95281 100644 --- a/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/types/GCPSpannerOpType.java +++ b/nb-adapters/adapter-gcp-spanner/src/main/java/io/nosqlbench/adapter/gcpspanner/types/GCPSpannerOpType.java @@ -21,4 +21,5 @@ package io.nosqlbench.adapter.gcpspanner.types; */ public enum GCPSpannerOpType { create_table, + insert_vector, }