Fleshing out pgvector adapter

This commit is contained in:
Mark Wolters
2023-10-10 09:51:20 -04:00
parent aed2ee8d58
commit fd388b9ffc
7 changed files with 132 additions and 32 deletions

View File

@@ -18,6 +18,7 @@ package io.nosqlbench.adapter.pgvector;
import io.nosqlbench.adapter.pgvector.opdispensers.PGVectorExecuteOpDispenser;
import io.nosqlbench.adapter.pgvector.opdispensers.PGVectorExecuteQueryOpDispenser;
import io.nosqlbench.adapter.pgvector.opdispensers.PGVectorExecuteUpdateOpDispenser;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorOp;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
@@ -75,10 +76,12 @@ public class PGVectorOpMapper implements OpMapper<PGVectorOp> {
// INSERT|UPDATE|DELETE uses 'executeUpdate' and returns an 'int'
// https://jdbc.postgresql.org/documentation/query/#performing-updates
case update ->
new PGVectorExecuteUpdateOpDispenser(adapter, connectionLongFunc, op, opType.targetFunction);
// CREATE|DROP TABLE|VIEW uses 'execute' (as opposed to 'executeQuery' which returns a 'ResultSet')
// https://jdbc.postgresql.org/documentation/query/#example54dropping-a-table-in-jdbc
case execute, update ->
case execute ->
new PGVectorExecuteOpDispenser(adapter, connectionLongFunc, op, opType.targetFunction);
};
}

View File

@@ -25,7 +25,7 @@ package io.nosqlbench.adapter.pgvector;
*/
public enum PGVectorOpType {
//See https://jdbc.postgresql.org/documentation/query/
execute, // Used for CREATE|DROP DATABASE|TABLE operation. Returns nothing.
execute, // Used for CREATE|DROP DATABASE|TABLE operation. Returns nothing in theory however the API call can return ResultSet(s) in practice.
query, // Used for SELECT operation. Returns a ResultSet object.
update // Used for updating records such as INSERT|UPDATE|DELETE. Returns the number of rows affected.
}

View File

@@ -0,0 +1,38 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.nosqlbench.adapter.pgvector.opdispensers;
import io.nosqlbench.adapter.pgvector.PGVectorSpace;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorExecuteUpdateOp;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import java.sql.Connection;
import java.util.function.LongFunction;
public class PGVectorExecuteUpdateOpDispenser extends PGVectorBaseOpDispenser {
public PGVectorExecuteUpdateOpDispenser(DriverAdapter<PGVectorOp, PGVectorSpace> adapter, LongFunction<Connection> connectionLongFunc, ParsedOp op, LongFunction<String> targetFunction) {
super(adapter, connectionLongFunc, op, targetFunction);
}
@Override
public PGVectorOp apply(long cycle) {
return new PGVectorExecuteUpdateOp(this.connectionLongFunction.apply(cycle), this.statementLongFunction.apply(cycle), targetFunction.apply(cycle));
}
}

View File

@@ -19,8 +19,12 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class PGVectorExecuteOp extends PGVectorOp {
private static final Logger LOGGER = LogManager.getLogger(PGVectorExecuteOp.class);
@@ -32,20 +36,32 @@ public class PGVectorExecuteOp extends PGVectorOp {
}
@Override
public void run() {
public Object apply(long value) {
List<ResultSet> queryResults = new ArrayList<>();
try {
if (!statement.execute(queryString)) {
LOGGER.debug(() -> {
try {
return String.format(LOG_UPDATE_COUNT, statement.getUpdateCount());
} catch (SQLException e) {
LOGGER.error(LOG_UPDATE_COUNT_ERROR, e);
throw new RuntimeException(LOG_UPDATE_COUNT_ERROR, e);
boolean isResultSet = statement.execute(queryString);
ResultSet rs;
if (isResultSet) {
int countResults = 0;
rs = statement.getResultSet();
Objects.requireNonNull(rs);
countResults += rs.getRow();
queryResults.add(rs);
while (null != rs) {
while (statement.getMoreResults() && -1 > statement.getUpdateCount()) {
countResults += rs.getRow();
}
});
rs = statement.getResultSet();
queryResults.add(rs);
}
finalResultCount = countResults;
LOGGER.debug(() -> LOG_ROWS_PROCESSED);
}
connection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
return queryResults;
} catch (SQLException sqlException) {
String exMsg = String.format("ERROR: [ state => %s, cause => %s, message => %s ]",
sqlException.getSQLState(), sqlException.getCause(), sqlException.getMessage());

View File

@@ -22,6 +22,8 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class PGVectorExecuteQueryOp extends PGVectorOp {
@@ -32,29 +34,13 @@ public class PGVectorExecuteQueryOp extends PGVectorOp {
}
@Override
public void run() {
public Object apply(long value) {
List<ResultSet> queryResults = new ArrayList<>();
try {
boolean isResultSet = statement.execute(queryString);
ResultSet rs;
if (isResultSet) {
int countResults = 0;
rs = statement.getResultSet();
Objects.requireNonNull(rs);
countResults += rs.getRow();
while (null != rs) {
while (statement.getMoreResults() && -1 > statement.getUpdateCount()) {
countResults += rs.getRow();
}
rs = statement.getResultSet();
}
finalResultCount = countResults;
LOGGER.debug(() -> LOG_ROWS_PROCESSED);
}
ResultSet rs = statement.executeQuery(queryString);
connection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
return queryResults;
} catch (SQLException sqlException) {
String exMsg = String.format("ERROR: [ state => %s, cause => %s, message => %s ]",
sqlException.getSQLState(), sqlException.getCause(), sqlException.getMessage());

View File

@@ -0,0 +1,56 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector.optypes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class PGVectorExecuteUpdateOp extends PGVectorOp {
private static final Logger LOGGER = LogManager.getLogger(PGVectorExecuteUpdateOp.class);
private static final String LOG_UPDATE_COUNT_ERROR = "Exception occurred while attempting to fetch the update count of the query operation";
private static final String LOG_UPDATE_COUNT = "Executed a normal DDL/DML (non-SELECT) operation. DML query updated [%d] records";
public PGVectorExecuteUpdateOp(Connection connection, Statement statement, String queryString) {
super(connection, statement, queryString);
}
@Override
public Object apply(long value) {
try {
finalResultCount = statement.executeUpdate(queryString);
LOGGER.debug(() -> LOG_ROWS_PROCESSED);
connection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
return finalResultCount;
} catch (SQLException sqlException) {
String exMsg = String.format("ERROR: [ state => %s, cause => %s, message => %s ]",
sqlException.getSQLState(), sqlException.getCause(), sqlException.getMessage());
LOGGER.error(exMsg, sqlException);
throw new RuntimeException(exMsg, sqlException);
} catch (Exception ex) {
LOGGER.error(LOG_GENERIC_ERROR, ex);
throw new RuntimeException(LOG_GENERIC_ERROR, ex);
}
}
}

View File

@@ -16,6 +16,7 @@
package io.nosqlbench.adapter.pgvector.optypes;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -36,7 +37,7 @@ import java.sql.Statement;
*
* @see <a href="https://github.com/brettwooldridge/HikariCP">HikariCP connection pooling</a> for details.
*/
public abstract class PGVectorOp implements RunnableOp {
public abstract class PGVectorOp implements CycleOp {
private static final Logger LOGGER = LogManager.getLogger(PGVectorOp.class);
protected static final String LOG_COMMIT_SUCCESS = "Executed the PGVector statement & committed the connection successfully";