Merge pull request #1624 from yabinmeng/main

Major NB JDBC driver updates in order to support pg-vector
This commit is contained in:
yabinmeng
2023-10-18 10:01:42 -05:00
committed by GitHub
41 changed files with 1236 additions and 1606 deletions

View File

@@ -29,7 +29,7 @@
<name>${project.artifactId}</name>
<description>
A JDBC driver for nosqlbench. This provides the ability to inject synthetic data
A JDBC driver for nosqlbench. This provides the ability to inject synthetic data
into a PostegreSQL® compatible database leveraging HikariCP.
</description>
@@ -65,6 +65,14 @@
<artifactId>HikariCP</artifactId>
<version>5.0.1</version>
</dependency>
<!-- &lt;!&ndash; https://mvnrepository.com/artifact/org.apache.commons/commons-collections4 &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.apache.commons</groupId>-->
<!-- <artifactId>commons-collections4</artifactId>-->
<!-- <version>4.4</version>-->
<!-- </dependency>-->
</dependencies>
<build>

View File

@@ -16,8 +16,9 @@
package io.nosqlbench.adapter.jdbc;
import io.nosqlbench.adapter.jdbc.opdispensers.JDBCExecuteOpDispenser;
import io.nosqlbench.adapter.jdbc.opdispensers.JDBCExecuteQueryOpDispenser;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
import io.nosqlbench.adapter.jdbc.opdispensers.JDBCDMLOpDispenser;
import io.nosqlbench.adapter.jdbc.opdispensers.JDBCDDLOpDispenser;
import io.nosqlbench.adapter.jdbc.optypes.JDBCOp;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
@@ -26,6 +27,7 @@ import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -47,12 +49,17 @@ public class JDBCOpMapper implements OpMapper<JDBCOp> {
@Override
public OpDispenser<? extends JDBCOp> apply(ParsedOp op) {
LongFunction<String> spaceNameF = op.getAsFunctionOr("space", "default");
LongFunction<JDBCSpace> spaceFunc = l -> spaceCache.get(spaceNameF.apply(l));
String spaceName = op.getStaticConfigOr("space", "default");
JDBCSpace jdbcSpace = spaceCache.get(spaceName);
// Since the only needed thing in the JDBCSpace is the Connection, we can short-circuit
// to it here instead of stepping down from the cycle to the space to the connection.
LongFunction<Connection> connectionLongFunc = l -> spaceCache.get(spaceNameF.apply(l)).getConnection();
int nbThreadNum = NumberUtils.toInt(op.getStaticConfig("threads", String.class));
int maxConnNum = jdbcSpace.getMaxNumConn();
if (nbThreadNum > maxConnNum) {
throw new JDBCAdapterInvalidParamException(
"JDBC connection is NOT thread safe. The total NB thread number (" + nbThreadNum +
") can NOT be greater than the maximum connection number 'num_conn' (" + maxConnNum + ")"
);
}
/*
* If the user provides a body element, then they want to provide the JSON or
@@ -62,24 +69,22 @@ public class JDBCOpMapper implements OpMapper<JDBCOp> {
if (op.isDefined("body")) {
throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
} else {
TypeAndTarget<JDBCOpType, String> opType = op.getTypeAndTarget(JDBCOpType.class, String.class, "type", "stmt");
TypeAndTarget<JDBCOpType, String> opType = op.getTypeAndTarget(JDBCOpType.class, String.class);
logger.info(() -> "Using " + opType.enumId + " statement form for '" + op.getName());
return switch (opType.enumId) {
// SELECT uses 'executeQuery' and returns a 'ResultSet'
// https://jdbc.postgresql.org/documentation/query/#example51processing-a-simple-query-in-jdbc
case query ->
new JDBCExecuteQueryOpDispenser(adapter, connectionLongFunc, op, opType.targetFunction);
// INSERT|UPDATE|DELETE uses 'executeUpdate' and returns an 'int'
// https://jdbc.postgresql.org/documentation/query/#performing-updates
// CREATE|DROP TABLE|VIEW uses 'execute' (as opposed to 'executeQuery' which returns a 'ResultSet')
// https://jdbc.postgresql.org/documentation/query/#example54dropping-a-table-in-jdbc
case execute, update ->
new JDBCExecuteOpDispenser(adapter, connectionLongFunc, op, opType.targetFunction);
case ddl->
new JDBCDDLOpDispenser(adapter, jdbcSpace, op, opType.targetFunction);
// https://jdbc.postgresql.org/documentation/query/#performing-updates
case dmlwrite ->
new JDBCDMLOpDispenser(adapter, jdbcSpace, op, false, opType.targetFunction);
// https://jdbc.postgresql.org/documentation/query/#example51processing-a-simple-query-in-jdbc
case dmlread ->
new JDBCDMLOpDispenser(adapter, jdbcSpace, op, true, opType.targetFunction);
};
}
}

View File

@@ -24,8 +24,7 @@ package io.nosqlbench.adapter.jdbc;
* @see <a href="https://www.cockroachlabs.com/docs/v22.2/sql-statements.html#data-definition-statements">CockroachDB API Reference</a>
*/
public enum JDBCOpType {
//See https://jdbc.postgresql.org/documentation/query/
execute, // Used for CREATE|DROP DATABASE|TABLE operation. Returns nothing.
query, // Used for SELECT operation. Returns a ResultSet object.
update // Used for updating records such as INSERT|UPDATE|DELETE. Returns the number of rows affected.
ddl, // Used for DDL statements (Statement). Returns boolean (success or not).
dmlwrite, // Used for DML write statements (INSERT|UPDATE|DELETE) (PreparedStatement). Returns the number of rows affected.
dmlread // Used for DML read statements (SELECT) (PreparedStatement). Returns a list of the ResultSet objects.
}

View File

@@ -18,38 +18,136 @@ package io.nosqlbench.adapter.jdbc;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
import io.nosqlbench.adapter.jdbc.utils.JDBCAdapterUtil;
import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import io.nosqlbench.api.errors.OpConfigError;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class JDBCSpace implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(JDBCSpace.class);
private final String spaceName;
// How many JDBC connections per NB JDBC execution
// NOTE: Since JDBC connection is NOT thread-safe, the total NB threads MUST be less
// than or equal to this number. This is to make sure one thread per connection.
private final static int DEFAULT_CONN_NUM = 5;
private final int maxNumConn;
// For DML statements, how many statements to put together in one batch
// - 1 : no batch (default)
// - positive number: using batch
private final static int DEFAULT_DML_BATCH_NUM = 1;
private final int dmlBatchNum;
private final long totalCycleNum;
private static boolean isShuttingDown = false;
private HikariConfig hikariConfig;
private HikariDataSource hikariDataSource;
private Connection connection;
ConcurrentHashMap<String, Connection> connections = new ConcurrentHashMap<>();
public JDBCSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.hikariDataSource = createClient(cfg);
this.totalCycleNum = NumberUtils.toLong(cfg.getOptional("cycles").orElse("1"));
int totalThreads = NumberUtils.toInt(cfg.getOptional("threads").orElse("1"));
int numConnInput = NumberUtils.toInt(cfg.getOptional("num_conn").orElse("10"));
this.maxNumConn = Math.min(totalThreads, numConnInput);
if (this.maxNumConn < 1) {
throw new JDBCAdapterInvalidParamException(
"'num_conn' NB CLI parameter must be a positive number!"
);
}
// Must be after the 'maxNumConn' statements and before the rest of the remaining statements!
this.initializeSpace(cfg);
this.dmlBatchNum = NumberUtils.toInt(cfg.getOptional("dml_batch").orElse("1"));
if (this.dmlBatchNum < 1) {
throw new JDBCAdapterInvalidParamException(
"'dml_batch' NB CLI parameter must be a positive number!"
);
}
// According to JDBC spec,
// - The commit behavior of executeBatch is always implementation-defined
// when an error occurs and auto-commit is true.
//
// In this adapter, we treat it as an error if 'autoCommit' is ON and using batch at the same time.
if ( (this.dmlBatchNum > 1) && (hikariConfig.isAutoCommit()) ) {
throw new JDBCAdapterInvalidParamException(
"Using batch, 'dml_batch'(" + this.dmlBatchNum + ") > 1, along with 'autoCommit' ON is not supported!"
);
}
if (logger.isDebugEnabled()) {
logger.debug("{} JDBC connections will be created [max(threads/{}, num_conn/{}]; " +
"dml_batch: {}, autoCommit: {}",
maxNumConn, totalThreads, numConnInput, dmlBatchNum, hikariConfig.isAutoCommit());
}
}
public Connection getConnection() {
return this.connection;
@Override
public void close() {
shutdownSpace();
}
public int getMaxNumConn() { return this.maxNumConn; }
public int getDmlBatchNum() { return this.dmlBatchNum; }
public long getTotalCycleNum() { return this.totalCycleNum; }
public boolean isShuttingDown() { return isShuttingDown; }
public void enterShutdownStage() { isShuttingDown = true; }
public HikariDataSource getHikariDataSource() {
return this.hikariDataSource;
}
private HikariDataSource createClient(NBConfiguration cfg) {
public Connection getConnection(String connectionName) {
Connection connection = connections.get(connectionName);
if (connection == null) {
try {
connection = hikariDataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("JDBC connection ({}) is successfully created: {}",
connectionName, connection);
}
// Register 'vector' type
JDBCPgVector.addVectorType(connection);
connections.put(connectionName, connection);
}
catch (Exception ex) {
String exp = "Exception occurred while attempting to create a connection using the HikariDataSource";
logger.error(exp, ex);
throw new JDBCAdapterUnexpectedException(exp);
}
}
return connection;
}
private void initializeSpace(NBConfiguration cfg) {
hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(cfg.get("url"));
@@ -106,29 +204,63 @@ public class JDBCSpace implements AutoCloseable {
hikariConfig.addDataSourceProperty("rewriteBatchedInserts", cfg.getOrDefault("rewriteBatchedInserts", true));
// We're managing the auto-commit behavior of connections ourselves and hence disabling the auto-commit.
//Optional<Boolean> autoCommit = cfg.getOptional(Boolean.class, "autoCommit");
hikariConfig.setAutoCommit(false);
Optional<String> autoCommitOpt = cfg.getOptional("autoCommit");
boolean autoCommit = false;
if (autoCommitOpt.isPresent()) autoCommit = BooleanUtils.toBoolean(autoCommitOpt.get());
hikariConfig.setAutoCommit(autoCommit);
hikariConfig.setMaximumPoolSize(Integer.parseInt(cfg.get("maximumPoolSize")));
hikariConfig.setKeepaliveTime(Integer.parseInt(cfg.get("keepaliveTime")));
hikariConfig.setMaximumPoolSize(Integer.parseInt(cfg.get("maximumPoolSize")));
HikariDataSource hds = new HikariDataSource(hikariConfig);
// HikariCP "maximumPoolSize" parameter is ignored.
// Use the NB "num_conn" parameter instead, wth 20% extra capacity
hikariConfig.setMaximumPoolSize((int)Math.ceil(1.2*maxNumConn));
this.hikariDataSource = new HikariDataSource(hikariConfig);
}
private void shutdownSpace() {
isShuttingDown = true;
try {
this.connection = hds.getConnection();
// We're taking an opinionated approach here and managing the commit ourselves.
this.getConnection().setAutoCommit(false);
} catch (Exception ex) {
String exp = "Exception occurred while attempting to create a connection using the HikariDataSource";
logger.error(exp, ex);
throw new RuntimeException(exp, ex);
waitUntilAllOpFinished(System.currentTimeMillis());
for (Connection connection : connections.values()) {
connection.close();
}
} catch (Exception e) {
throw new JDBCAdapterUnexpectedException("Unexpected error when trying to close the JDBC connection!");
}
return hds;
hikariDataSource.close();
}
private void waitUntilAllOpFinished(long shutdownStartTimeMills) {
final int timeToWaitInSec = 5;
long timeElapsedMills;
boolean continueChk;
do {
JDBCAdapterUtil.pauseCurThreadExec(1);
long curTimeMills = System.currentTimeMillis();
timeElapsedMills = curTimeMills - shutdownStartTimeMills;
continueChk = (timeElapsedMills <= (timeToWaitInSec*1000));
} while (continueChk);
logger.info(
"shutdownSpace::waitUntilAllOpFinished -- " +
"shutdown time elapsed: " + timeElapsedMills + "ms.");
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(JDBCSpace.class)
.add(Param.defaultTo("num_conn", DEFAULT_CONN_NUM)
.setDescription("The number of JDBC connections to establish. Defaults to '" + DEFAULT_CONN_NUM + "'"))
.add(Param.defaultTo("dml_batch", DEFAULT_DML_BATCH_NUM)
.setDescription("The number of DML write statements in a batch. Defaults to 1. Ignored by DML read statements!" +
DEFAULT_DML_BATCH_NUM + "' (no batch)"))
.add(Param.defaultTo("url", "jdbc:postgresql:/")
.setDescription("The connection URL used to connect to the DBMS. Defaults to 'jdbc:postgresql:/'"))
.add(Param.defaultTo("url", "jdbc:postgresql:/")
.setDescription("The connection URL used to connect to the DBMS. Defaults to 'jdbc:postgresql:/'"))
.add(Param.defaultTo("serverName", "localhost")
@@ -164,89 +296,78 @@ public class JDBCSpace implements AutoCloseable {
"It is a boolean value. Default: false. This cannot be changed."))
.add(Param.optional("connectionTimeout")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("idleTimeout")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.defaultTo("keepaliveTime", "150000")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("maxLifetime")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("connectionTestQuery")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("minimumIdle")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.defaultTo("maximumPoolSize", "40")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. Default value is 40 and cannot be changed."))
.add(Param.optional("metricRegistry")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("healthCheckRegistry")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("poolName")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("initializationFailTimeout")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("isolateInternalQueries")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("allowPoolSuspension")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("readOnly")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("registerMbeans")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("catalog")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("connectionInitSql")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("driverClassName")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("transactionIsolation")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("validationTimeout")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("leakDetectionThreshold")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("dataSource")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("schema")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("threadFactory")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("scheduledExecutor")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
"This property is not exposed and hence cannot be changed."))
.asReadOnly();
}
@Override
public void close() {
try {
this.getConnection().close();
this.getHikariDataSource().close();
} catch (Exception e) {
logger.error("auto-closeable jdbc connection threw exception in jdbc space(" + this.spaceName + "): " + e);
throw new RuntimeException(e);
}
}
}

View File

@@ -0,0 +1,29 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.jdbc.exceptions;
public class JDBCAdapterInvalidParamException extends RuntimeException {
public JDBCAdapterInvalidParamException(String paramName, String errDesc) {
super("Invalid setting for parameter (" + paramName + "): " + errDesc);
}
public JDBCAdapterInvalidParamException(String fullErrDesc) {
super(fullErrDesc);
}
}

View File

@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.jdbc.exceptions;
public class JDBCAdapterUnexpectedException extends RuntimeException {
public JDBCAdapterUnexpectedException(String message) {
super(message);
printStackTrace();
}
public JDBCAdapterUnexpectedException(Exception e) {
super(e);
printStackTrace();
}
}

View File

@@ -0,0 +1,24 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.nosqlbench.adapter.jdbc.exceptions;
public class JDBCPgVectorException extends RuntimeException {
public JDBCPgVectorException(String errMsg) {
super("Unexpected JDBCPgVector Error: \"" + errMsg + "\"");
}
}

View File

@@ -21,44 +21,27 @@ import io.nosqlbench.adapter.jdbc.optypes.JDBCOp;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.function.LongFunction;
public abstract class JDBCBaseOpDispenser extends BaseOpDispenser<JDBCOp, JDBCSpace> {
private static final Logger logger = LogManager.getLogger(JDBCBaseOpDispenser.class);
protected static final String ERROR_STATEMENT_CREATION =
"Error while attempting to create the jdbc statement from the connection";
protected static final String ERROR_STATEMENT_CREATION = "Error while attempting to create the jdbc statement from the connection";
protected final JDBCSpace jdbcSpace;
protected boolean isDdlStatement;
protected final boolean isPreparedStatement;
protected final String verifierKeyName;
protected final LongFunction<String> targetFunction;
protected final LongFunction<Connection> connectionLongFunction;
protected final LongFunction<Statement> statementLongFunction;
public JDBCBaseOpDispenser(DriverAdapter<JDBCOp, JDBCSpace> adapter, LongFunction<Connection> connectionLongFunc, ParsedOp op, LongFunction<String> targetFunction) {
public JDBCBaseOpDispenser(DriverAdapter<JDBCOp, JDBCSpace> adapter,
JDBCSpace jdbcSpace,
ParsedOp op) {
super(adapter, op);
this.connectionLongFunction = connectionLongFunc;
this.targetFunction = targetFunction;
this.statementLongFunction = createStmtFunc(op);
this.jdbcSpace = jdbcSpace;
this.isPreparedStatement = op.getStaticConfigOr("prepared", false);
this.verifierKeyName = op.getStaticConfigOr("verifier-key", "");
}
protected LongFunction<Statement> createStmtFunc(ParsedOp cmd) {
try {
LongFunction<Statement> basefunc = l -> {
try {
return this.connectionLongFunction.apply(l).createStatement();
} catch (SQLException e) {
throw new RuntimeException(e);
}
};
return basefunc;
} catch (Exception ex) {
logger.error(ERROR_STATEMENT_CREATION, ex);
throw new RuntimeException(ERROR_STATEMENT_CREATION, ex);
public void checkShutdownEntry(long cycle) {
if (cycle == (jdbcSpace.getTotalCycleNum()-1)) {
jdbcSpace.enterShutdownStage();
}
}
}

View File

@@ -17,25 +17,38 @@
package io.nosqlbench.adapter.jdbc.opdispensers;
import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.optypes.JDBCExecuteQueryOp;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
import io.nosqlbench.adapter.jdbc.optypes.JDBCDDLOp;
import io.nosqlbench.adapter.jdbc.optypes.JDBCOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.util.function.LongFunction;
public class JDBCExecuteQueryOpDispenser extends JDBCBaseOpDispenser {
private static final Logger logger = LogManager.getLogger(JDBCExecuteQueryOpDispenser.class);
public class JDBCDDLOpDispenser extends JDBCBaseOpDispenser {
public JDBCExecuteQueryOpDispenser(DriverAdapter<JDBCOp, JDBCSpace> adapter, LongFunction<Connection> connectionLongFunc, ParsedOp op, LongFunction<String> targetFunction) {
super(adapter, connectionLongFunc, op, targetFunction);
private static final Logger logger = LogManager.getLogger(JDBCDDLOpDispenser.class);
private final LongFunction<String> ddlSqlStrFunc;
public JDBCDDLOpDispenser(DriverAdapter<JDBCOp, JDBCSpace> adapter,
JDBCSpace jdbcSpace,
ParsedOp op,
LongFunction<String> sqlStrFunc) {
super(adapter, jdbcSpace, op);
this.isDdlStatement = true;
this.ddlSqlStrFunc = sqlStrFunc;
if (isPreparedStatement) {
throw new JDBCAdapterInvalidParamException("DDL statements can NOT be prepared!");
}
}
@Override
public JDBCExecuteQueryOp apply(long cycle) {
return new JDBCExecuteQueryOp(this.connectionLongFunction.apply(cycle), this.statementLongFunction.apply(cycle), targetFunction.apply(cycle));
public JDBCDDLOp apply(long cycle) {
checkShutdownEntry(cycle);
String ddlSqlStr = ddlSqlStrFunc.apply(cycle);
return new JDBCDDLOp(jdbcSpace, ddlSqlStr);
}
}

View File

@@ -0,0 +1,92 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.jdbc.opdispensers;
import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
import io.nosqlbench.adapter.jdbc.optypes.JDBCDMLOp;
import io.nosqlbench.adapter.jdbc.optypes.JDBCDMLReadOp;
import io.nosqlbench.adapter.jdbc.optypes.JDBCDMLWriteOp;
import io.nosqlbench.adapter.jdbc.optypes.JDBCOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.PreparedStatement;
import java.util.*;
import java.util.function.LongFunction;
public class JDBCDMLOpDispenser extends JDBCBaseOpDispenser {
private static final Logger logger = LogManager.getLogger(JDBCDMLOpDispenser.class);
private final boolean isReadStatement;
private final LongFunction<String> pStmtSqlStrFunc;
private final LongFunction<List<Object>> pStmtValListFunc;
public JDBCDMLOpDispenser(DriverAdapter<JDBCOp, JDBCSpace> adapter,
JDBCSpace jdbcSpace,
ParsedOp op,
boolean isReadStmt,
LongFunction<String> pStmtSqlStrFunc) {
super(adapter, jdbcSpace, op);
this.isDdlStatement = false;
this.isReadStatement = isReadStmt;
if (!isPreparedStatement && !isReadStatement) {
throw new JDBCAdapterInvalidParamException("DML write statements MUST be prepared!");
}
this.pStmtSqlStrFunc = pStmtSqlStrFunc;
Optional<LongFunction<String>> pStmtValListStrFuncOpt =
op.getAsOptionalFunction("prep_stmt_val_arr", String.class);
pStmtValListFunc = (l) -> {
List<Object> pStmtValListObj = new ArrayList<>();
if (pStmtValListStrFuncOpt.isPresent()) {
String pStmtValListStr = pStmtValListStrFuncOpt.get().apply(l);
String[] valList = pStmtValListStr.split(",\\s*(?![^\\(\\[]*[\\]\\)])");
pStmtValListObj.addAll(Arrays.asList(valList));
}
return pStmtValListObj;
};
}
@Override
public JDBCDMLOp apply(long cycle) {
checkShutdownEntry(cycle);
if (isReadStatement) {
return new JDBCDMLReadOp(
jdbcSpace,
true,
pStmtSqlStrFunc.apply(cycle),
pStmtValListFunc.apply(cycle),
this.verifierKeyName);
}
else {
int ddlStmtBatchNum = jdbcSpace.getDmlBatchNum();
return new JDBCDMLWriteOp(
jdbcSpace,
false,
pStmtSqlStrFunc.apply(cycle),
pStmtValListFunc.apply(cycle),
ddlStmtBatchNum);
}
}
}

View File

@@ -1,38 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.jdbc.opdispensers;
import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.optypes.JDBCExecuteOp;
import io.nosqlbench.adapter.jdbc.optypes.JDBCOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import java.sql.Connection;
import java.util.function.LongFunction;
public class JDBCExecuteOpDispenser extends JDBCBaseOpDispenser {
public JDBCExecuteOpDispenser(DriverAdapter<JDBCOp, JDBCSpace> adapter, LongFunction<Connection> connectionLongFunc, ParsedOp op, LongFunction<String> targetFunction) {
super(adapter, connectionLongFunc, op, targetFunction);
}
@Override
public JDBCExecuteOp apply(long cycle) {
return new JDBCExecuteOp(this.connectionLongFunction.apply(cycle), this.statementLongFunction.apply(cycle), targetFunction.apply(cycle));
}
}

View File

@@ -0,0 +1,56 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.jdbc.optypes;
import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class JDBCDDLOp extends JDBCOp {
private static final Logger LOGGER = LogManager.getLogger(JDBCDDLOp.class);
private final String ddlStmtStr;
public JDBCDDLOp(JDBCSpace jdbcSpace, String ddlStmtStr) {
super(jdbcSpace);
this.ddlStmtStr = ddlStmtStr;
}
private Statement createDDLStatement() {
try {
return jdbcConnection.createStatement();
} catch (SQLException e) {
throw new JDBCAdapterUnexpectedException(
"Unable to create a regular (non-prepared) JDBC statement");
}
}
@Override
public Object apply(long value) {
try {
Statement stmt = createDDLStatement();
stmt.execute(ddlStmtStr);
closeStatement(stmt);
return true;
} catch (SQLException sqlException) {
throw new JDBCAdapterUnexpectedException(
"Failed to execute the DDL statement: \"" + ddlStmtStr + "\"");
}
}
}

View File

@@ -0,0 +1,147 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.jdbc.optypes;
import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCPgVectorException;
import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
public abstract class JDBCDMLOp extends JDBCOp {
private static final Logger LOGGER = LogManager.getLogger(JDBCDMLOp.class);
protected final boolean isReadStmt;
// If the passed-in statement string has placeholder '?', it is a prepared statement.
protected final boolean isPreparedStmt;
protected final String pStmtSqlStr;
protected final List<Object> pStmtValList;
protected static ThreadLocal<Statement> jdbcStmtTL = ThreadLocal.withInitial(() -> null);
public JDBCDMLOp(JDBCSpace jdbcSpace,
boolean isReadStmt,
String pStmtSqlStr,
List<Object> pStmtValList) {
super(jdbcSpace);
assert(StringUtils.isNotBlank(pStmtSqlStr));
this.isReadStmt = isReadStmt;
this.pStmtSqlStr = pStmtSqlStr;
this.pStmtValList = pStmtValList;
this.isPreparedStmt = StringUtils.contains(pStmtSqlStr, "?");
// NOTE:
// - The write DML statement MUST use prepared statement!
// - The read DML statement can use either a prepared or a regular statement.
if (!isReadStmt && !isPreparedStmt) {
throw new JDBCAdapterInvalidParamException(
"Write DML statement must use prepared statement format!");
}
if (isPreparedStmt) {
int expectedFiledCnt = StringUtils.countMatches(pStmtSqlStr, "?");
int actualFieldCnt = pStmtValList.size();
if (expectedFiledCnt != actualFieldCnt) {
throw new JDBCAdapterUnexpectedException(
"Provided value count (" + actualFieldCnt
+ ") doesn't match the expected field count (" + expectedFiledCnt
+ ") for the prepared statement: \"" + pStmtSqlStr + "\""
);
}
}
}
// Only applicable to a prepared statement
protected PreparedStatement setPrepStmtValues(PreparedStatement stmt, List<Object> valList) {
assert (stmt != null);
for (int i=0; i<valList.size(); i++) {
int fieldIdx = i + 1;
Object fieldValObj = valList.get(i);
assert (fieldValObj != null);
try {
// Special processing for Vector
if (fieldValObj instanceof String) {
String strObj = (String)fieldValObj;
if (StringUtils.isNotBlank(strObj)) {
strObj = strObj.trim();
// If the 'fieldVal' is a string like "[<float_num_1>, <float_num_2>, ... <float_num_n>]" format,
// convert it to the Vector object
if (strObj.startsWith("[") && strObj.endsWith("]")) {
JDBCPgVector vector = new JDBCPgVector();
vector.setValue(strObj);
fieldValObj = vector;
}
}
}
stmt.setObject(fieldIdx, fieldValObj);
}
catch (JDBCPgVectorException | SQLException e) {
throw new RuntimeException(
"Failed to parse the prepared statement value for field[" + fieldIdx + "] " + fieldValObj);
}
}
return stmt;
}
protected void processCommit() {
try {
if (!jdbcConnection.getAutoCommit()) {
jdbcConnection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
}
} catch (SQLException e) {
throw new JDBCAdapterUnexpectedException("Failed to process JDBC statement commit!");
}
}
protected Statement createDMLStatement() {
Statement stmt = jdbcStmtTL.get();
try {
if (stmt == null) {
if (isPreparedStmt)
stmt = jdbcConnection.prepareStatement(pStmtSqlStr);
else
stmt = jdbcConnection.createStatement();
jdbcStmtTL.set(stmt);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("A statement is created -- prepared: {}, read/write: {}, stmt: {}",
isPreparedStmt,
isReadStmt ? "read" : "write",
stmt);
}
}
return stmt;
} catch (SQLException e) {
throw new JDBCAdapterUnexpectedException(
"Unable to create a prepared JDBC statement");
}
}
}

View File

@@ -0,0 +1,97 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.jdbc.optypes;
import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class JDBCDMLReadOp extends JDBCDMLOp {
private static final Logger LOGGER = LogManager.getLogger(JDBCDMLReadOp.class);
private String verifierKeyName;
public JDBCDMLReadOp(JDBCSpace jdbcSpace,
boolean isReadStmt,
String pStmtSqlStr,
List<Object> pStmtValList,
String verifierKeyName) {
super(jdbcSpace, isReadStmt, pStmtSqlStr, pStmtValList);
this.verifierKeyName = verifierKeyName;
}
@Override
public Object apply(long value) {
Statement stmt = super.createDMLStatement();
if (isPreparedStmt) {
stmt = super.setPrepStmtValues((PreparedStatement) stmt, this.pStmtValList);
}
try {
// key string list to be used in the "Vector" relevancy score verification
List<String> verifierValueList = new ArrayList<>();
ResultSet rs;
if (!isPreparedStmt) {
rs = stmt.executeQuery(pStmtSqlStr);
do {
String keyVal = rs.getString(this.verifierKeyName);
if (StringUtils.isNotBlank(keyVal)) {
verifierValueList.add(keyVal);
}
} while (rs.next());
closeStatement(stmt);
}
else {
boolean isResultSet = ((PreparedStatement)stmt).execute();
super.processCommit();
while(true) {
if(isResultSet) {
rs = stmt.getResultSet();
while(rs.next()) {
String keyVal = rs.getString(this.verifierKeyName);
if (StringUtils.isNotBlank(keyVal)) {
verifierValueList.add(keyVal);
}
}
rs.close();
} else {
if(stmt.getUpdateCount() == -1) {
break;
}
}
isResultSet = stmt.getMoreResults();
}
closeStatement(stmt);
}
return verifierValueList;
}
catch (SQLException sqlException) {
throw new JDBCAdapterUnexpectedException(
"Failed to execute the prepared DDL stmt: \"" + pStmtSqlStr + "\", " +
"with values: \"" + pStmtValList + "\"");
}
}
}

View File

@@ -0,0 +1,96 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.jdbc.optypes;
import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.IntStream;
public class JDBCDMLWriteOp extends JDBCDMLOp {
private static final Logger LOGGER = LogManager.getLogger(JDBCDMLWriteOp.class);
private final int ddlStmtBatchNum;
protected final static ThreadLocal<Integer> threadBatchTrackingCntTL = ThreadLocal.withInitial(() -> 0);
public JDBCDMLWriteOp(JDBCSpace jdbcSpace,
boolean isReadStmt,
String pStmtSqlStr,
List<Object> pStmtValList,
int ddlStmtBatchNum) {
super(jdbcSpace, isReadStmt, pStmtSqlStr, pStmtValList);
this.ddlStmtBatchNum = ddlStmtBatchNum;
}
@Override
public Object apply(long value) {
int trackingCnt = threadBatchTrackingCntTL.get();
trackingCnt = trackingCnt + 1;
threadBatchTrackingCntTL.set(trackingCnt);
PreparedStatement stmt = (PreparedStatement) super.createDMLStatement();
stmt = super.setPrepStmtValues(stmt, this.pStmtValList);
try {
// No batch
if (ddlStmtBatchNum == 1) {
int result_cnt = stmt.executeUpdate();
super.processCommit();
closeStatement(stmt);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[single ddl - execution] cycle:{}, result_cnt: {}, stmt: {}",
value, result_cnt, stmt);
}
return result_cnt;
}
// Use batch
else {
stmt.addBatch();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[batch ddl - adding to batch] cycle:{}, stmt: {}",
value, stmt);
}
if ( (trackingCnt % ddlStmtBatchNum == 0) || jdbcSpace.isShuttingDown() ) {
int[] counts = stmt.executeBatch();
processCommit();
closeStatement(stmt);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[batch ddl - execution] cycle:{}, total_batch_res_cnt:{}, stmt: {}",
value, counts, stmt);
}
return IntStream.of(counts).sum();
} else {
return 0;
}
}
}
catch (SQLException sqlException) {
throw new JDBCAdapterUnexpectedException(
"Failed to execute the prepared DDL statement: \"" + pStmtSqlStr + "\", " +
"with values: \"" + pStmtValList + "\"");
}
}
}

View File

@@ -1,59 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.jdbc.optypes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
public class JDBCExecuteOp extends JDBCOp {
private static final Logger LOGGER = LogManager.getLogger(JDBCExecuteOp.class);
private static final String LOG_UPDATE_COUNT_ERROR = "Exception occurred while attempting to fetch the update count of the query operation";
private static final String LOG_UPDATE_COUNT = "Executed a normal DDL/DML (non-SELECT) operation. DML query updated [%d] records";
public JDBCExecuteOp(Connection connection, Statement statement, String queryString) {
super(connection, statement, queryString);
}
@Override
public void run() {
try {
if (!statement.execute(queryString)) {
LOGGER.debug(() -> {
try {
return String.format(LOG_UPDATE_COUNT, statement.getUpdateCount());
} catch (SQLException e) {
LOGGER.error(LOG_UPDATE_COUNT_ERROR, e);
throw new RuntimeException(LOG_UPDATE_COUNT_ERROR, e);
}
});
}
connection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
} catch (SQLException sqlException) {
String exMsg = String.format("ERROR: [ state => %s, cause => %s, message => %s ]",
sqlException.getSQLState(), sqlException.getCause(), sqlException.getMessage());
LOGGER.error(exMsg, sqlException);
throw new RuntimeException(exMsg, sqlException);
} catch (Exception ex) {
LOGGER.error(LOG_GENERIC_ERROR, ex);
throw new RuntimeException(LOG_GENERIC_ERROR, ex);
}
}
}

View File

@@ -1,68 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.jdbc.optypes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Objects;
public class JDBCExecuteQueryOp extends JDBCOp {
private static final Logger LOGGER = LogManager.getLogger(JDBCExecuteQueryOp.class);
public JDBCExecuteQueryOp(Connection connection, Statement statement, String queryString) {
super(connection, statement, queryString);
}
@Override
public void run() {
try {
boolean isResultSet = statement.execute(queryString);
ResultSet rs;
if (isResultSet) {
int countResults = 0;
rs = statement.getResultSet();
Objects.requireNonNull(rs);
countResults += rs.getRow();
while (null != rs) {
while (statement.getMoreResults() && -1 > statement.getUpdateCount()) {
countResults += rs.getRow();
}
rs = statement.getResultSet();
}
finalResultCount = countResults;
LOGGER.debug(() -> LOG_ROWS_PROCESSED);
}
connection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
} catch (SQLException sqlException) {
String exMsg = String.format("ERROR: [ state => %s, cause => %s, message => %s ]",
sqlException.getSQLState(), sqlException.getCause(), sqlException.getMessage());
LOGGER.error(exMsg, sqlException);
throw new RuntimeException(exMsg, sqlException);
} catch (Exception ex) {
LOGGER.error(LOG_GENERIC_ERROR, ex);
throw new RuntimeException(LOG_GENERIC_ERROR, ex);
}
}
}

View File

@@ -16,48 +16,36 @@
package io.nosqlbench.adapter.jdbc.optypes;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp;
import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
/**
* References:
* https://docs.oracle.com/javase/tutorial/jdbc/basics/gettingstarted.html
* https://docs.oracle.com/javase/17/docs/api/java/sql/package-summary.html
* https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/package-summary.html
* https://jdbc.postgresql.org/documentation/query/
* https://www.cockroachlabs.com/docs/v22.2/connection-pooling.html
* https://www.cockroachlabs.com/docs/v22.2/connection-parameters#supported-options-parameters
* https://www.cockroachlabs.com/docs/v22.2/sql-statements.html#query-management-statements
* https://docs.yugabyte.com/preview/drivers-orms/java/yugabyte-jdbc/
*
* @see <a href="https://github.com/brettwooldridge/HikariCP">HikariCP connection pooling</a> for details.
*/
public abstract class JDBCOp implements RunnableOp {
public abstract class JDBCOp implements CycleOp {
private static final Logger LOGGER = LogManager.getLogger(JDBCOp.class);
protected static final String LOG_COMMIT_SUCCESS = "Executed the JDBC statement & committed the connection successfully";
protected static final String LOG_COMMIT_SUCCESS =
"Executed the JDBC statement & committed the connection successfully";
protected final String LOG_GENERIC_ERROR;
protected final Connection connection;
protected final Statement statement;
protected final String queryString;
protected final JDBCSpace jdbcSpace;
protected final Connection jdbcConnection;
protected int finalResultCount;
protected String LOG_ROWS_PROCESSED = "Total number of rows processed is [" + finalResultCount + "]";
public JDBCOp(JDBCSpace jdbcSpace) {
this.jdbcSpace = jdbcSpace;
String curThreadName = Thread.currentThread().getName();
this.jdbcConnection = this.jdbcSpace.getConnection(curThreadName);
}
/**
* @param connection
* @param statement
* @param queryString
*/
public JDBCOp(Connection connection, Statement statement, String queryString) {
this.connection = connection;
this.statement = statement;
this.queryString = queryString;
LOG_GENERIC_ERROR = String.format("Exception while attempting to run the jdbc query %s", queryString);
LOGGER.debug(() -> "Query to be executed: " + queryString);
protected void closeStatement(Statement stmt) throws SQLException {
if (! (stmt instanceof PreparedStatement)) {
stmt.close();
} else if (jdbcSpace.isShuttingDown()) {
stmt.close();
}
}
}

View File

@@ -0,0 +1,37 @@
package io.nosqlbench.adapter.jdbc.utils;
/*
* Copyright (c) 2022 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class JDBCAdapterUtil {
private final static Logger logger = LogManager.getLogger(JDBCAdapterUtil.class);
public static void pauseCurThreadExec(int pauseInSec) {
if (pauseInSec > 0) {
try {
Thread.sleep(pauseInSec * 1000L);
}
catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}

View File

@@ -0,0 +1,141 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.jdbc.utils;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCPgVectorException;
import org.postgresql.PGConnection;
import org.postgresql.util.ByteConverter;
import org.postgresql.util.PGBinaryObject;
import org.postgresql.util.PGobject;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Arrays;
public class JDBCPgVector extends PGobject implements PGBinaryObject, Serializable, Cloneable {
private float[] vec;
/**
* Constructor
*/
public JDBCPgVector() {
type = "vector";
}
/**
* Constructor
*/
public JDBCPgVector(float[] v) {
this();
vec = v;
}
/**
* Constructor
*/
public JDBCPgVector(String s) {
this();
setValue(s);
}
/**
* Sets the value from a text representation of a vector
*/
public void setValue(String s) throws JDBCPgVectorException {
if (s == null) {
vec = null;
} else {
String[] sp = s.substring(1, s.length() - 1).split(",");
vec = new float[sp.length];
try {
for (int i = 0; i < sp.length; i++) {
vec[i] = Float.parseFloat(sp[i]);
}
}
catch (NumberFormatException nfe) {
throw new JDBCPgVectorException("the embedding value can't be converted to float");
}
}
}
/**
* Returns the text representation of a vector
*/
public String getValue() {
if (vec == null) {
return null;
} else {
return Arrays.toString(vec).replace(" ", "");
}
}
/**
* Returns the number of bytes for the binary representation
*/
public int lengthInBytes() {
return vec == null ? 0 : 4 + vec.length * 4;
}
/**
* Sets the value from a binary representation of a vector
*/
public void setByteValue(byte[] value, int offset) throws JDBCPgVectorException {
int dim = ByteConverter.int2(value, offset);
int unused = ByteConverter.int2(value, offset + 2);
if (unused != 0) {
throw new JDBCPgVectorException("expected unused to be 0");
}
vec = new float[dim];
for (int i = 0; i < dim; i++) {
vec[i] = ByteConverter.float4(value, offset + 4 + i * 4);
}
}
/**
* Writes the binary representation of a vector
*/
public void toBytes(byte[] bytes, int offset) {
if (vec == null) {
return;
}
// server will error on overflow due to unconsumed buffer
// could set to Short.MAX_VALUE for friendlier error message
ByteConverter.int2(bytes, offset, vec.length);
ByteConverter.int2(bytes, offset + 2, 0);
for (int i = 0; i < vec.length; i++) {
ByteConverter.float4(bytes, offset + 4 + i * 4, vec[i]);
}
}
/**
* Returns an array
*/
public float[] toArray() {
return vec;
}
/**
* Registers the vector type
*/
public static void addVectorType(Connection conn) throws SQLException {
conn.unwrap(PGConnection.class).addDataType("vector", JDBCPgVector.class);
}
}

View File

@@ -0,0 +1,30 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.extensions.vectormath;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
public class PgvecUtils {
public static int[] getValueListForVerifierKey(List<String> values) {
int[] intArr = values.stream().mapToInt(v -> {
return Integer.parseInt(Objects.requireNonNull(v));
}).toArray();
return intArr;
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.engine.extensions.vectormath;
import com.codahale.metrics.MetricRegistry;
import io.nosqlbench.api.config.LabeledScenarioContext;
import io.nosqlbench.api.extensions.ScriptingExtensionPluginInfo;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.Logger;
@Service(value = ScriptingExtensionPluginInfo.class,selector = "pgvec_utils")
public class PgvecUtilsPluginInfo implements ScriptingExtensionPluginInfo<PgvecUtils> {
@Override
public String getDescription() {
return "various methods and utilities for working with vector math in a scripted environment";
}
@Override
public PgvecUtils getExtensionObject(Logger logger, MetricRegistry metricRegistry, LabeledScenarioContext scriptContext) {
return new PgvecUtils();
}
}

View File

@@ -0,0 +1,152 @@
# run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags="block:schema" threads=AUTO cycles=4 url="jdbc:postgresql://host:port/database" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName=insectdb sslrootcert="/path/to/postgresql_certs/root.crt" -vv --show-stacktraces
min_version: "5.17.2"
description: |
A workload which reads ann-benchmarks vector data from HDF5 file format for PostgreSql with pgvector.
scenarios:
default:
####
# The following CLI parameters are required for all named scenarios:
# - schema: schema name
# - table: table name
####
###
## For DDL workload, turn on 'AutoCommit'. Turning it off will cause errors.
###
drop-tbl: run driver=jdbc tags==block:drop-tbl threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
# The following CLI parameters is needed for 'create-tbl' named scenario:
# - dimensions: vector dimension size (MUST match the actual ANN benchmark data)
create-tbl: run driver=jdbc tags==block:create-tbl threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
#
# Vectors with up to 2,000 dimensions can be indexed.
#
# The following extra CLI parameter is needed for both 'create-vec-idx' and 'drop-vec-idx' named scenarios:
# - indexName: index name
drop-vec-idx: run driver=jdbc tags==block:drop-vec-idx threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
# The following extra CLI parameters are needed for 'create-vec-idx' named scenario:
# - indexType: index type; valid values: 'ivfflat' or 'hnsw' (see: https://github.com/pgvector/pgvector#indexing)
# - indexOpt: index options
# * for 'ivfflat' index type, the option is like: "lists=<number>"
# * for 'hnsw' index type, the option is like: "m=<number>,ef_construction =<number>"
# - relFunc: relevancy function; valid values: 'l2' (L2 distance), 'ip' (Inner product), or 'cosine' (Cosine distance)
create-vec-idx: run driver=jdbc tags==block:create-vec-idx threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
###
## For DML workload, 'AutoCommit' can be off or on
## - MUST be off when batching is used !!
###
# The following extra CLI parameters is needed for both 'vec-read' and 'vec-write' named scenarios:
# - dataset: ANN benchmark testing dataset name
#
# The following extra CLI parameters is needed for 'vec-read' named scenario:
# - queryLimit: the number of the records to be returned as in 'LIMIT <number>' clause
vec-read: run driver=jdbc tags==block:vec-read cycles===TEMPLATE(main-cycles,100) threads=AUTO url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
# The following extra CLI parameters is needed for 'vec-write' named scenario:
# - trainsize: the number of records to load from the dataset and insert into the table
vec-write: run driver=jdbc tags==block:vec-write cycles===TEMPLATE(trainsize) threads=AUTO url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
bindings:
rw_key: ToString()
train_vector: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/train");
test_vector: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/test");
validation_set: HdfFileToIntArray("testdata/TEMPLATE(dataset).hdf5", "/neighbors");
blocks:
drop-tbl:
params:
# DDL statement must NOT be prepared
prepared: false
ops:
drop-table:
ddl: |
DROP TABLE IF EXISTS TEMPLATE(schema,public).TEMPLATE(table,pgvec);
drop-schema:
ddl: |
DROP SCHEMA IF EXISTS TEMPLATE(schema,public);
create-tbl:
params:
# DDL statement must NOT be prepared
prepared: false
ops:
create-schema:
ddl: |
CREATE SCHEMA IF NOT EXISTS TEMPLATE(schema,public);
create-table:
ddl: |
CREATE TABLE IF NOT EXISTS TEMPLATE(schema,public).TEMPLATE(table,pgvec)
(key TEXT PRIMARY KEY, value vector(<<dimensions:5>>));
drop-vec-idx:
params:
# DDL statement must NOT be prepared
prepared: false
ops:
drop-vector-index:
ddl: |
DROP INDEX IF EXISTS TEMPLATE(schema,public).TEMPLATE(indexName,TEMPLATE(dataset)-idx);
create-vec-idx:
params:
# DDL statement must NOT be prepared
prepared: false
ops:
create-vector-index:
ddl: |
CREATE INDEX IF NOT EXISTS TEMPLATE(indexName,TEMPLATE(dataset)-idx-TEMPLATE(indexType))
ON TEMPLATE(schema,public).TEMPLATE(table,pgvec)
USING TEMPLATE(indexType) (value vector_TEMPLATE(relFunc)_ops)
WITH (TEMPLATE(indexOpt));
# Using PostgreSQl upsert (INSERT ON CONFLICT statement)
vec-write:
params:
# DML write statement MUST be prepared
prepared: true
ops:
main-insert:
dmlwrite: |
INSERT INTO TEMPLATE(schema,public).TEMPLATE(table,pgvec) VALUES (?,?)
ON CONFLICT DO NOTHING;
prep_stmt_val_arr: |
{rw_key},{test_vector}
vec-read:
ops:
params:
# DML READ statement can be prepared or not
prepared: true
main-select:
dmlread: |
SELECT key, (value <-> ?) as relevancy, value
FROM TEMPLATE(schema,public).TEMPLATE(table,pgvec)
ORDER BY value <-> ?
LIMIT TEMPLATE(queryLimit,100);
prep_stmt_val_arr: |
{test_vector},{test_vector}
#################################
## NOTE:
# 1). The script blocks below are ONLY relevant with Vector relevancy score verification
# 2). The "verifier-key" must match the Vector data identifier column name (e.g. primary key name)
# right now the identifier must be a type that can be converted to int.
verifier-key: "key"
verifier-imports:
- io.nosqlbench.adapter.mongodb.MongoDbUtils
verifier-init: |
relevancy=scriptingmetrics.newRelevancyMeasures(_parsed_op);
for (int k in List.of(100)) {
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k));
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k));
}
verifier: |
// driver-specific function
actual_indices=pgvec_utils.getValueListForVerifierKey(result);
// driver-agnostic function
relevancy.accept({validation_set},actual_indices);
// because we are "verifying" although this needs to be reorganized
return true;

View File

@@ -1,81 +0,0 @@
<!--
~ Copyright (c) 2023 nosqlbench
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.nosqlbench</groupId>
<artifactId>mvn-defaults</artifactId>
<version>${revision}</version>
<relativePath>../mvn-defaults</relativePath>
</parent>
<artifactId>adapter-pgvector</artifactId>
<packaging>jar</packaging>
<name>${project.artifactId}</name>
<description>
A JDBC driver for nosqlbench that includes PGVector. This provides the ability to inject synthetic data
into a PostegreSQL® compatible database leveraging HikariCP.
</description>
<dependencies>
<!-- core dependencies -->
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>adapters-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>engine-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>io.nosqlbench</groupId>
<artifactId>nb-annotations</artifactId>
<version>${revision}</version>
<scope>compile</scope>
</dependency>
<!-- https://search.maven.org/artifact/org.postgresql/postgresql -->
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.5.4</version>
</dependency>
<!-- https://search.maven.org/artifact/com/pgvector/pgvector -->
<dependency>
<groupId>com.pgvector</groupId>
<artifactId>pgvector</artifactId>
<version>0.1.3</version>
</dependency>
<!-- https://search.maven.org/artifact/com.zaxxer/HikariCP -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.0.1</version>
</dependency>
</dependencies>
<build>
<plugins>
</plugins>
</build>
</project>

View File

@@ -1,52 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorOp;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.BaseDriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.nb.annotations.Service;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.function.Function;
@Service(value = DriverAdapter.class, selector = "jdbc")
public class PGVectorDriverAdapter extends BaseDriverAdapter<PGVectorOp, PGVectorSpace> {
private final static Logger logger = LogManager.getLogger(PGVectorDriverAdapter.class);
@Override
public OpMapper<PGVectorOp> getOpMapper() {
DriverSpaceCache<? extends PGVectorSpace> spaceCache = getSpaceCache();
NBConfiguration adapterConfig = getConfiguration();
return new PGVectorOpMapper(this, adapterConfig, spaceCache);
}
@Override
public Function<String, ? extends PGVectorSpace> getSpaceInitializer(NBConfiguration cfg) {
return (s) -> new PGVectorSpace(s, cfg);
}
@Override
public NBConfigModel getConfigModel() {
return super.getConfigModel().add(PGVectorSpace.getConfigModel());
}
}

View File

@@ -1,89 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector;
import io.nosqlbench.adapter.pgvector.opdispensers.PGVectorExecuteOpDispenser;
import io.nosqlbench.adapter.pgvector.opdispensers.PGVectorExecuteQueryOpDispenser;
import io.nosqlbench.adapter.pgvector.opdispensers.PGVectorExecuteUpdateOpDispenser;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorOp;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.adapters.api.activityimpl.OpDispenser;
import io.nosqlbench.adapters.api.activityimpl.OpMapper;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverSpaceCache;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import io.nosqlbench.engine.api.templating.TypeAndTarget;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.util.function.LongFunction;
public class PGVectorOpMapper implements OpMapper<PGVectorOp> {
private final static Logger logger = LogManager.getLogger(PGVectorOpMapper.class);
private final DriverAdapter adapter;
private final NBConfiguration cfg;
private final DriverSpaceCache<? extends PGVectorSpace> spaceCache;
public PGVectorOpMapper(DriverAdapter adapter, NBConfiguration cfg, DriverSpaceCache<? extends PGVectorSpace> spaceCache) {
this.adapter = adapter;
this.cfg = cfg;
this.spaceCache = spaceCache;
}
@Override
public OpDispenser<? extends PGVectorOp> apply(ParsedOp op) {
LongFunction<String> spaceNameF = op.getAsFunctionOr("space", "default");
LongFunction<PGVectorSpace> spaceFunc = l -> spaceCache.get(spaceNameF.apply(l));
// Since the only needed thing in the PGVectorSpace is the Connection, we can short-circuit
// to it here instead of stepping down from the cycle to the space to the connection.
LongFunction<Connection> connectionLongFunc = l -> spaceCache.get(spaceNameF.apply(l)).getConnection();
/*
* If the user provides a body element, then they want to provide the JSON or
* a data structure that can be converted into JSON, bypassing any further
* specialized type-checking or op-type specific features
*/
if (op.isDefined("body")) {
throw new RuntimeException("This mode is reserved for later. Do not use the 'body' op field.");
} else {
TypeAndTarget<PGVectorOpType, String> opType = op.getTypeAndTarget(PGVectorOpType.class, String.class, "type", "stmt");
logger.info(() -> "Using " + opType.enumId + " statement form for '" + op.getName());
return switch (opType.enumId) {
// SELECT uses 'executeQuery' and returns a 'ResultSet'
// https://jdbc.postgresql.org/documentation/query/#example51processing-a-simple-query-in-jdbc
case query ->
new PGVectorExecuteQueryOpDispenser(adapter, connectionLongFunc, op, opType.targetFunction);
// INSERT|UPDATE|DELETE uses 'executeUpdate' and returns an 'int'
// https://jdbc.postgresql.org/documentation/query/#performing-updates
case update ->
new PGVectorExecuteUpdateOpDispenser(adapter, connectionLongFunc, op, opType.targetFunction);
// CREATE|DROP TABLE|VIEW uses 'execute' (as opposed to 'executeQuery' which returns a 'ResultSet')
// https://jdbc.postgresql.org/documentation/query/#example54dropping-a-table-in-jdbc
case execute ->
new PGVectorExecuteOpDispenser(adapter, connectionLongFunc, op, opType.targetFunction);
};
}
}
}

View File

@@ -1,31 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector;
/**
* Op templates which are supported by the NoSQLBench PGVector driver are
* enumerated below. These command names should mirror those in the official
* CockroachDB API exactly, as an example. See the official API for more details.
*
* @see <a href="https://www.cockroachlabs.com/docs/v22.2/sql-statements.html#data-definition-statements">CockroachDB API Reference</a>
*/
public enum PGVectorOpType {
//See https://jdbc.postgresql.org/documentation/query/
execute, // Used for CREATE|DROP DATABASE|TABLE operation. Returns nothing in theory however the API call can return ResultSet(s) in practice.
query, // Used for SELECT operation. Returns a ResultSet object.
update // Used for updating records such as INSERT|UPDATE|DELETE. Returns the number of rows affected.
}

View File

@@ -1,252 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import io.nosqlbench.api.config.standard.ConfigModel;
import io.nosqlbench.api.config.standard.NBConfigModel;
import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import io.nosqlbench.api.errors.OpConfigError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.util.Optional;
public class PGVectorSpace implements AutoCloseable {
private final static Logger logger = LogManager.getLogger(PGVectorSpace.class);
private final String spaceName;
private HikariConfig hikariConfig;
private HikariDataSource hikariDataSource;
private Connection connection;
public PGVectorSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.hikariDataSource = createClient(cfg);
}
public Connection getConnection() {
return this.connection;
}
public HikariDataSource getHikariDataSource() {
return this.hikariDataSource;
}
private HikariDataSource createClient(NBConfiguration cfg) {
hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(cfg.get("url"));
hikariConfig.addDataSourceProperty("serverName", cfg.get("serverName"));
Optional<String> databaseName = cfg.getOptional("databaseName");
if (databaseName.isPresent()) {
hikariConfig.addDataSourceProperty("databaseName", databaseName.get());
}
int portNumber = Integer.parseInt(cfg.get("portNumber"));
hikariConfig.addDataSourceProperty("portNumber", portNumber);
Optional<String> user = cfg.getOptional("user");
if (user.isPresent()) {
hikariConfig.setUsername(user.get());
}
Optional<String> password = cfg.getOptional("password");
if (password.isPresent()) {
if (user.isEmpty()) {
throw new OpConfigError("Both user and password options are required. Only password is supplied in this case.");
}
hikariConfig.setPassword(password.get());
} else {
if (user.isPresent()) {
throw new OpConfigError("Both user and password options are required. Only user is supplied in this case.");
}
}
Optional<Boolean> ssl = cfg.getOptional(Boolean.class, "ssl");
hikariConfig.addDataSourceProperty("ssl", ssl.orElse(false));
Optional<String> sslMode = cfg.getOptional("sslmode");
if (sslMode.isPresent()) {
hikariConfig.addDataSourceProperty("sslmode", sslMode.get());
} else {
hikariConfig.addDataSourceProperty("sslmode", "prefer");
}
Optional<String> sslCert = cfg.getOptional("sslcert");
if (sslCert.isPresent()) {
hikariConfig.addDataSourceProperty("sslcert", sslCert.get());
} /*else if(sslMode.isPresent() && (!"disable".equalsIgnoreCase(sslMode.get()) || !"allow".equalsIgnoreCase(sslMode.get())) || !"prefer".equalsIgnoreCase(sslMode.get())) {
throw new OpConfigError("When sslmode is true, sslcert should be provided.");
}*/
Optional<String> sslRootCert = cfg.getOptional("sslrootcert");
if (sslRootCert.isPresent()) {
hikariConfig.addDataSourceProperty("sslrootcert", sslRootCert.get());
}
hikariConfig.addDataSourceProperty("applicationName", cfg.get("applicationName"));
hikariConfig.addDataSourceProperty("rewriteBatchedInserts", cfg.getOrDefault("rewriteBatchedInserts", true));
// We're managing the auto-commit behavior of connections ourselves and hence disabling the auto-commit.
//Optional<Boolean> autoCommit = cfg.getOptional(Boolean.class, "autoCommit");
hikariConfig.setAutoCommit(false);
hikariConfig.setMaximumPoolSize(Integer.parseInt(cfg.get("maximumPoolSize")));
hikariConfig.setKeepaliveTime(Integer.parseInt(cfg.get("keepaliveTime")));
hikariConfig.setMaximumPoolSize(Integer.parseInt(cfg.get("maximumPoolSize")));
HikariDataSource hds = new HikariDataSource(hikariConfig);
try {
this.connection = hds.getConnection();
// We're taking an opinionated approach here and managing the commit ourselves.
this.getConnection().setAutoCommit(false);
} catch (Exception ex) {
String exp = "Exception occurred while attempting to create a connection using the HikariDataSource";
logger.error(exp, ex);
throw new RuntimeException(exp, ex);
}
return hds;
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(PGVectorSpace.class)
.add(Param.defaultTo("url", "jdbc:postgresql:/")
.setDescription("The connection URL used to connect to the DBMS. Defaults to 'jdbc:postgresql:/'"))
.add(Param.defaultTo("serverName", "localhost")
.setDescription("The host name of the server. Defaults to 'localhost'"))
.add(Param.optional("databaseName")
.setDescription("The database name. The default is to connect to a database with the same name as the user name used to connect to the server."))
// See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby & https://jdbc.postgresql.org/documentation/use/
.add(Param.defaultTo("portNumber", "5432")
.setDescription("The port number the server is listening on. Defaults to the PostgreSQL® standard port number (5432)."))
.add(Param.optional("user")
.setDescription("The database user on whose behalf the connection is being made."))
.add(Param.optional("password")
.setDescription("The database users password."))
.add(Param.optional("ssl")
.setDescription("Whether to connect using SSL. Default is false."))
.add(Param.optional("sslmode")
.setDescription("Possible values include disable , allow , prefer , require , verify-ca and verify-full." +
" require , allow and prefer all default to a non-validating SSL factory and do not check the validity of the certificate or the host name." +
" verify-ca validates the certificate, but does not verify the hostname." +
" verify-full will validate that the certificate is correct and verify the host connected to has the same hostname as the certificate." +
" Default is prefer."))
.add(Param.optional("sslcert")
.setDescription("Provide the full path for the certificate file. Defaults to defaultdir/postgresql.crt, where defaultdir is ${user.home}/.postgresql/ in *nix systems and %appdata%/postgresql/ on windows."))
.add(Param.optional("sslrootcert")
.setDescription("File name of the SSL root certificate."))
.add(Param.defaultTo("applicationName", "NoSQLBench")
.setDescription("The application name to be used. Default is 'NoSQLBench'."))
.add(Param.optional("rewriteBatchedInserts")
.setDescription("This will change batch inserts from insert into foo (col1, col2, col3) values (1, 2, 3) into insert into foo (col1, col2, col3) values (1, 2, 3), (4, 5, 6) this provides 2-3x performance improvement. " +
"Default is true"))
.add(Param.optional("autoCommit")
.setDescription("This property controls the default auto-commit behavior of connections returned from the pool. " +
"It is a boolean value. Default: false. This cannot be changed."))
.add(Param.optional("connectionTimeout")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("idleTimeout")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.defaultTo("keepaliveTime", "150000")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("maxLifetime")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("connectionTestQuery")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("minimumIdle")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.defaultTo("maximumPoolSize", "40")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. Default value is 40 and cannot be changed."))
.add(Param.optional("metricRegistry")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("healthCheckRegistry")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("poolName")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("initializationFailTimeout")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("isolateInternalQueries")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("allowPoolSuspension")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("readOnly")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("registerMbeans")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("catalog")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("connectionInitSql")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("driverClassName")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("transactionIsolation")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("validationTimeout")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("leakDetectionThreshold")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("dataSource")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("schema")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("threadFactory")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.add(Param.optional("scheduledExecutor")
.setDescription("See https://github.com/brettwooldridge/HikariCP/tree/dev#gear-configuration-knobs-baby for details. " +
"This property is not exposed and hence cannot be changed."))
.asReadOnly();
}
@Override
public void close() {
try {
this.getConnection().close();
this.getHikariDataSource().close();
} catch (Exception e) {
logger.error("auto-closeable jdbc connection threw exception in jdbc space(" + this.spaceName + "): " + e);
throw new RuntimeException(e);
}
}
}

View File

@@ -1,64 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector.opdispensers;
import io.nosqlbench.adapter.pgvector.PGVectorSpace;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorOp;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.function.LongFunction;
public abstract class PGVectorBaseOpDispenser extends BaseOpDispenser<PGVectorOp, PGVectorSpace> {
private static final Logger logger = LogManager.getLogger(PGVectorBaseOpDispenser.class);
protected static final String ERROR_STATEMENT_CREATION = "Error while attempting to create the jdbc statement from the connection";
protected final LongFunction<String> targetFunction;
protected final LongFunction<Connection> connectionLongFunction;
protected final LongFunction<Statement> statementLongFunction;
public PGVectorBaseOpDispenser(DriverAdapter<PGVectorOp, PGVectorSpace> adapter, LongFunction<Connection> connectionLongFunc, ParsedOp op, LongFunction<String> targetFunction) {
super(adapter, op);
this.connectionLongFunction = connectionLongFunc;
this.targetFunction = targetFunction;
this.statementLongFunction = createStmtFunc(op);
}
protected LongFunction<Statement> createStmtFunc(ParsedOp cmd) {
try {
LongFunction<Statement> basefunc = l -> {
try {
return this.connectionLongFunction.apply(l).createStatement();
} catch (SQLException e) {
throw new RuntimeException(e);
}
};
return basefunc;
} catch (Exception ex) {
logger.error(ERROR_STATEMENT_CREATION, ex);
throw new RuntimeException(ERROR_STATEMENT_CREATION, ex);
}
}
}

View File

@@ -1,38 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector.opdispensers;
import io.nosqlbench.adapter.pgvector.PGVectorSpace;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorExecuteOp;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import java.sql.Connection;
import java.util.function.LongFunction;
public class PGVectorExecuteOpDispenser extends PGVectorBaseOpDispenser {
public PGVectorExecuteOpDispenser(DriverAdapter<PGVectorOp, PGVectorSpace> adapter, LongFunction<Connection> connectionLongFunc, ParsedOp op, LongFunction<String> targetFunction) {
super(adapter, connectionLongFunc, op, targetFunction);
}
@Override
public PGVectorExecuteOp apply(long cycle) {
return new PGVectorExecuteOp(this.connectionLongFunction.apply(cycle), this.statementLongFunction.apply(cycle), targetFunction.apply(cycle));
}
}

View File

@@ -1,41 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector.opdispensers;
import io.nosqlbench.adapter.pgvector.PGVectorSpace;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorExecuteQueryOp;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.util.function.LongFunction;
public class PGVectorExecuteQueryOpDispenser extends PGVectorBaseOpDispenser {
private static final Logger logger = LogManager.getLogger(PGVectorExecuteQueryOpDispenser.class);
public PGVectorExecuteQueryOpDispenser(DriverAdapter<PGVectorOp, PGVectorSpace> adapter, LongFunction<Connection> connectionLongFunc, ParsedOp op, LongFunction<String> targetFunction) {
super(adapter, connectionLongFunc, op, targetFunction);
}
@Override
public PGVectorExecuteQueryOp apply(long cycle) {
return new PGVectorExecuteQueryOp(this.connectionLongFunction.apply(cycle), this.statementLongFunction.apply(cycle), targetFunction.apply(cycle));
}
}

View File

@@ -1,38 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package io.nosqlbench.adapter.pgvector.opdispensers;
import io.nosqlbench.adapter.pgvector.PGVectorSpace;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorExecuteUpdateOp;
import io.nosqlbench.adapter.pgvector.optypes.PGVectorOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import java.sql.Connection;
import java.util.function.LongFunction;
public class PGVectorExecuteUpdateOpDispenser extends PGVectorBaseOpDispenser {
public PGVectorExecuteUpdateOpDispenser(DriverAdapter<PGVectorOp, PGVectorSpace> adapter, LongFunction<Connection> connectionLongFunc, ParsedOp op, LongFunction<String> targetFunction) {
super(adapter, connectionLongFunc, op, targetFunction);
}
@Override
public PGVectorOp apply(long cycle) {
return new PGVectorExecuteUpdateOp(this.connectionLongFunction.apply(cycle), this.statementLongFunction.apply(cycle), targetFunction.apply(cycle));
}
}

View File

@@ -1,75 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector.optypes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class PGVectorExecuteOp extends PGVectorOp {
private static final Logger LOGGER = LogManager.getLogger(PGVectorExecuteOp.class);
private static final String LOG_UPDATE_COUNT_ERROR = "Exception occurred while attempting to fetch the update count of the query operation";
private static final String LOG_UPDATE_COUNT = "Executed a normal DDL/DML (non-SELECT) operation. DML query updated [%d] records";
public PGVectorExecuteOp(Connection connection, Statement statement, String queryString) {
super(connection, statement, queryString);
}
@Override
public Object apply(long value) {
List<ResultSet> queryResults = new ArrayList<>();
try {
boolean isResultSet = statement.execute(queryString);
ResultSet rs;
if (isResultSet) {
int countResults = 0;
rs = statement.getResultSet();
Objects.requireNonNull(rs);
countResults += rs.getRow();
queryResults.add(rs);
while (null != rs) {
while (statement.getMoreResults() && -1 > statement.getUpdateCount()) {
countResults += rs.getRow();
}
rs = statement.getResultSet();
queryResults.add(rs);
}
finalResultCount = countResults;
LOGGER.debug(() -> LOG_ROWS_PROCESSED);
}
connection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
return queryResults;
} catch (SQLException sqlException) {
String exMsg = String.format("ERROR: [ state => %s, cause => %s, message => %s ]",
sqlException.getSQLState(), sqlException.getCause(), sqlException.getMessage());
LOGGER.error(exMsg, sqlException);
throw new RuntimeException(exMsg, sqlException);
} catch (Exception ex) {
LOGGER.error(LOG_GENERIC_ERROR, ex);
throw new RuntimeException(LOG_GENERIC_ERROR, ex);
}
}
}

View File

@@ -1,54 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector.optypes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class PGVectorExecuteQueryOp extends PGVectorOp {
private static final Logger LOGGER = LogManager.getLogger(PGVectorExecuteQueryOp.class);
public PGVectorExecuteQueryOp(Connection connection, Statement statement, String queryString) {
super(connection, statement, queryString);
}
@Override
public Object apply(long value) {
List<ResultSet> queryResults = new ArrayList<>();
try {
ResultSet rs = statement.executeQuery(queryString);
connection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
return queryResults;
} catch (SQLException sqlException) {
String exMsg = String.format("ERROR: [ state => %s, cause => %s, message => %s ]",
sqlException.getSQLState(), sqlException.getCause(), sqlException.getMessage());
LOGGER.error(exMsg, sqlException);
throw new RuntimeException(exMsg, sqlException);
} catch (Exception ex) {
LOGGER.error(LOG_GENERIC_ERROR, ex);
throw new RuntimeException(LOG_GENERIC_ERROR, ex);
}
}
}

View File

@@ -1,56 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector.optypes;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
public class PGVectorExecuteUpdateOp extends PGVectorOp {
private static final Logger LOGGER = LogManager.getLogger(PGVectorExecuteUpdateOp.class);
private static final String LOG_UPDATE_COUNT_ERROR = "Exception occurred while attempting to fetch the update count of the query operation";
private static final String LOG_UPDATE_COUNT = "Executed a normal DDL/DML (non-SELECT) operation. DML query updated [%d] records";
public PGVectorExecuteUpdateOp(Connection connection, Statement statement, String queryString) {
super(connection, statement, queryString);
}
@Override
public Object apply(long value) {
try {
finalResultCount = statement.executeUpdate(queryString);
LOGGER.debug(() -> LOG_ROWS_PROCESSED);
connection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
return finalResultCount;
} catch (SQLException sqlException) {
String exMsg = String.format("ERROR: [ state => %s, cause => %s, message => %s ]",
sqlException.getSQLState(), sqlException.getCause(), sqlException.getMessage());
LOGGER.error(exMsg, sqlException);
throw new RuntimeException(exMsg, sqlException);
} catch (Exception ex) {
LOGGER.error(LOG_GENERIC_ERROR, ex);
throw new RuntimeException(LOG_GENERIC_ERROR, ex);
}
}
}

View File

@@ -1,64 +0,0 @@
/*
* Copyright (c) 2023 nosqlbench
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.nosqlbench.adapter.pgvector.optypes;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.RunnableOp;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.Statement;
/**
* References:
* https://docs.oracle.com/javase/tutorial/jdbc/basics/gettingstarted.html
* https://docs.oracle.com/javase/17/docs/api/java/sql/package-summary.html
* https://docs.oracle.com/en/java/javase/17/docs/api/java.sql/java/sql/package-summary.html
* https://jdbc.postgresql.org/documentation/query/
* https://www.cockroachlabs.com/docs/v22.2/connection-pooling.html
* https://www.cockroachlabs.com/docs/v22.2/connection-parameters#supported-options-parameters
* https://www.cockroachlabs.com/docs/v22.2/sql-statements.html#query-management-statements
* https://docs.yugabyte.com/preview/drivers-orms/java/yugabyte-jdbc/
*
* @see <a href="https://github.com/brettwooldridge/HikariCP">HikariCP connection pooling</a> for details.
*/
public abstract class PGVectorOp implements CycleOp {
private static final Logger LOGGER = LogManager.getLogger(PGVectorOp.class);
protected static final String LOG_COMMIT_SUCCESS = "Executed the PGVector statement & committed the connection successfully";
protected final String LOG_GENERIC_ERROR;
protected final Connection connection;
protected final Statement statement;
protected final String queryString;
protected int finalResultCount;
protected String LOG_ROWS_PROCESSED = "Total number of rows processed is [" + finalResultCount + "]";
/**
* @param connection
* @param statement
* @param queryString
*/
public PGVectorOp(Connection connection, Statement statement, String queryString) {
this.connection = connection;
this.statement = statement;
this.queryString = queryString;
LOG_GENERIC_ERROR = String.format("Exception while attempting to run the jdbc query %s", queryString);
LOGGER.debug(() -> "Query to be executed: " + queryString);
}
}

View File

@@ -1,62 +0,0 @@
# run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags="block:schema" threads=AUTO cycles=4 url="jdbc:postgresql://host:port/database" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName=insectdb sslrootcert="/path/to/postgresql_certs/root.crt" -vv --show-stacktraces
min_version: "5.17.1"
description: |
A workload with only text keys and text values. This is based on the CQL keyvalue workloads as found
in cql-keyvalue2.yaml.
scenarios:
default:
schema: run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags==block:schema threads=1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
rampup: run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags==block:rampup threads=AUTO cycles===TEMPLATE(rampup-cycles,100) url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
main: run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags==block:'main.*' threads=AUTO cycles===TEMPLATE(main-cycles,100) url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
params:
instrument: TEMPLATE(instrument,false)
bindings:
seq_key: Mod(TEMPLATE(keycount,1000000000)); ToString() -> String
seq_value: Hash(); Mod(TEMPLATE(valuecount,1000000000)); ToString() -> String
rw_key: <<keydist:Uniform(0,1000000000)->int>>; ToString() -> String
rw_value: Hash(); <<valdist:Uniform(0,1000000000)->int>>; ToString() -> String
blocks:
schema:
ops:
drop-database:
execute: |
DROP DATABASE IF EXISTS TEMPLATE(database,baselines);
create-database:
execute: |
CREATE DATABASE IF NOT EXISTS TEMPLATE(database,baselines);
drop-table:
execute: |
DROP TABLE IF EXISTS TEMPLATE(database,baselines).TEMPLATE(table,keyvalue);
create-table:
execute: |
CREATE TABLE IF NOT EXISTS TEMPLATE(database,baselines).TEMPLATE(table,keyvalue)
(key STRING PRIMARY KEY, value STRING);
rampup:
params:
ops:
rampup-insert:
update: |
INSERT INTO TEMPLATE(database,baselines).TEMPLATE(table,keyvalue)
(key, value) VALUES ({seq_key},{seq_value});
main-read:
params:
ratio: TEMPLATE(read_ratio,5)
ops:
main-select:
query: |
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue) WHERE key='{rw_key}';
main-write:
params:
ratio: TEMPLATE(write_ratio,5)
ops:
main-insert:
update: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue)
(key, value) VALUES ('{rw_key}', '{rw_value}');

View File

@@ -1,157 +0,0 @@
min_version: "5.17.1"
description: |
A tabular workload with partitions, clusters, and data fields
This workload contains partitioning and cluster along with a set
of 8 fields of varying length. The field values vary in size according
to the fibonacci sequence times a base size factor of 10, with
an additional 10% variance for each field.
The read patterns have a variety of field subsets specified.
During rampup, all rows will be written partition by partition,
filling in all rows of that partition before moving on to the next.
Example: With a partition size of 1000 and 1B rows, there will be
1000000 partitions.
During main phase, the read patterns are varied with different
field sets. As well, the number of rows which will be returned
is varied between 1 and 10.
By default, reads occur at the same ratio as writes, with main
phase writes writing full rows.
You can bulk up the size of the payloads by 10x with addzeroes='0',
by 100x with addzeroes='00', and so on, but if you want to go higher
than 100x, you'll need to modify the workload with a larger reference
file in the HashedFileExtractToString(...) bindings.
scenarios:
default:
schema: run driver=jdbc tags==block:schema cycles==UNDEF threads==1
rampup: run driver=jdbc tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto
main: run driver=jdbc tags==block:'main.*' cycles===TEMPLATE(main-cycles,100) threads=auto
params:
instrument: TEMPLATE(instrument,false)
bindings:
# for ramp-up and verify phases
#
part_layout: Div(<<partsize:1000>>); ToString() -> String
clust_layout: Mod(<<partsize:1000>>); ToString() -> String
# todo: update these definitions to use the simpler 10,0.1, 20, 0.2, ...
data0: Add(10); HashedFileExtractToString('data/lorem_ipsum_full.txt',9TEMPLATE(addzeroes,),11TEMPLATE(addzeroes,))
data1: Add(20); HashedFileExtractToString('data/lorem_ipsum_full.txt',18TEMPLATE(addzeroes,),22TEMPLATE(addzeroes,))
data2: Add(30); HashedFileExtractToString('data/lorem_ipsum_full.txt',27TEMPLATE(addzeroes,),33TEMPLATE(addzeroes,))
data3: Add(40); HashedFileExtractToString('data/lorem_ipsum_full.txt',45TEMPLATE(addzeroes,),55TEMPLATE(addzeroes,))
data4: Add(50); HashedFileExtractToString('data/lorem_ipsum_full.txt',72TEMPLATE(addzeroes,),88TEMPLATE(addzeroes,))
data5: Add(60); HashedFileExtractToString('data/lorem_ipsum_full.txt',107TEMPLATE(addzeroes,),143TEMPLATE(addzeroes,))
data6: Add(70); HashedFileExtractToString('data/lorem_ipsum_full.txt',189TEMPLATE(addzeroes,),231TEMPLATE(addzeroes,))
data7: Add(80); HashedFileExtractToString('data/lorem_ipsum_full.txt',306TEMPLATE(addzeroes,),374TEMPLATE(addzeroes,))
# for main phase
# for write
part_write: Hash(); Uniform(0,TEMPLATE(partcount,100))->int; ToString() -> String
clust_write: Hash(); Add(1); Uniform(0,TEMPLATE(partsize,1000000))->int; ToString() -> String
data_write: Hash(); HashedFileExtractToString('data/lorem_ipsum_full.txt',50,150) -> String
# for read
limit: Uniform(1,10) -> int
part_read: Uniform(0,TEMPLATE(partcount,100))->int; ToString() -> String
clust_read: Add(1); Uniform(0,TEMPLATE(partsize,1000000))->int; ToString() -> String
blocks:
schema:
params:
prepared: false
ops:
#drop-database:
# execute: |
# DROP DATABASE IF EXISTS TEMPLATE(database,baselines);
create-database:
execute: |
CREATE DATABASE IF NOT EXISTS TEMPLATE(database,baselines);
drop-table:
execute: |
DROP TABLE IF EXISTS TEMPLATE(database,baselines).TEMPLATE(table,tabular);
create-table:
execute: |
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) (
part STRING,
clust STRING,
data0 STRING, data1 STRING, data2 STRING, data3 STRING,
data4 STRING, data5 STRING, data6 STRING, data7 STRING,
PRIMARY KEY (part,clust)
);
rampup:
params:
ops:
rampup-insert:
update: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
(part,clust,data0,data1,data2,data3,data4,data5,data6,data7)
VALUES (
'{part_layout}','{clust_layout}','{data0}','{data1}','{data2}',
'{data3}','{data4}','{data5}','{data6}','{data7}'
);
verify:
params:
cl: TEMPLATE(read_cl,LOCAL_QUORUM)
ops:
verify-select:
query: |
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
WHERE part='{part_layout}'
AND clust='{clust_layout}'
main-read:
params:
ratio: TEMPLATE(read_ratio,1)
ops:
main-select-all:
query: |
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
WHERE part='{part_read}' LIMIT {limit};
main-select-01:
query: |
SELECT data0,data1 from TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
WHERE part='{part_read}' LIMIT {limit};
main-select-0246:
query: |
SELECT data0,data2,data4,data6 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
WHERE part='{part_read}' LIMIT {limit};
main-select-1357:
query: |
SELECT data1,data3,data5,data7 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
WHERE part='{part_read}' LIMIT {limit};
main-select-0123:
query: |
SELECT data0,data1,data2,data3 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
WHERE part='{part_read}' LIMIT {limit};
main-select-4567:
query: |
SELECT data4,data5,data6,data7 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
WHERE part='{part_read}' LIMIT {limit};
main-select-67:
query: |
SELECT data6,data7 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
WHERE part='{part_read}' LIMIT {limit};
main-select:
query: |
SELECT data0,data1,data2,data3,data4,data5,data6,data7
FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
WHERE part='{part_read}' LIMIT {limit};
main-write:
params:
ratio: TEMPLATE(write_ratio,8)
ops:
main-write:
update: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
(part, clust, data0,data1,data2,data3,data4,data5,data6,data7)
VALUES (
'{part_write}','{clust_write}','{data0}','{data1}','{data2}',
'{data3}','{data4}','{data5}','{data6}','{data7}'
);

View File

@@ -1,84 +0,0 @@
# jara -jar nb5.jar cockroachdb-timeseries default -vv --show-stacktraces
min_version: "5.17.1"
description: |
This workload emulates a time-series data model and access patterns.
scenarios:
default:
schema: run driver=jdbc tags==block:schema cycles==UNDEF threads==1 url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
rampup: run driver=jdbc tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
main: run driver=jdbc tags==block:'main.*' cycles===TEMPLATE(main-cycles,100) threads=auto url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
params:
instrument: TEMPLATE(instrument,false)
bindings:
machine_id: Mod(TEMPLATE(sources,10000)); ToHashedUUID() -> java.util.UUID
sensor_name: HashedLineToString('data/variable_words.txt')
time: Mul(TEMPLATE(timespeed,100)L); Div(TEMPLATE(sources,10000)L); ToJavaInstant()
cell_timestamp: Mul(TEMPLATE(timespeed,100)L); Div(TEMPLATE(sources,10000)L); Mul(1000L)
sensor_value: Normal(0.0,5.0); Add(100.0) -> double
station_id: Div(TEMPLATE(sources,10000));Mod(TEMPLATE(stations,100)); ToHashedUUID() -> java.util.UUID
data: HashedFileExtractToString('data/lorem_ipsum_full.txt',800TEMPLATE(addzeroes,),1200TEMPLATE(addzeroes,))
blocks:
schema:
params:
ops:
drop-database:
#execute: |
# DROP DATABASE IF EXISTS TEMPLATE(database,baselines);
create-database:
execute: |
CREATE DATABASE IF NOT EXISTS TEMPLATE(database,baselines);
drop-table:
execute: |
DROP TABLE IF EXISTS TEMPLATE(database,baselines).TEMPLATE(table,iot);
create-table:
execute: |
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,iot) (
machine_id UUID,
sensor_name STRING,
time TIMESTAMP,
sensor_value FLOAT,
station_id UUID,
data STRING,
PRIMARY KEY (machine_id, sensor_name, time)
);
rampup:
params:
ops:
insert-rampup:
update: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,iot)
(machine_id, sensor_name, time, sensor_value, station_id, data)
VALUES (
'{machine_id}', '{sensor_name}', '{time}', {sensor_value}, '{station_id}', '{data}'
);
#using timestamp {cell_timestamp}
main-read:
params:
ratio: TEMPLATE(read_ratio,1)
ops:
select-read:
query: |
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,iot)
WHERE machine_id='{machine_id}' and sensor_name='{sensor_name}'
LIMIT TEMPLATE(limit,10);
main-write:
params:
ratio: TEMPLATE(write_ratio,9)
ops:
insert-main:
update: |
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,iot)
(machine_id, sensor_name, time, sensor_value, station_id, data)
VALUES (
'{machine_id}', '{sensor_name}', '{time}', {sensor_value}, '{station_id}', '{data}'
);
#using timestamp {cell_timestamp}

View File

@@ -1,50 +0,0 @@
# JDBC driver
This JDBC driver leverages [Hikari Connection Pool](https://github.com/brettwooldridge/HikariCP/wiki) for connection pool and works with PostgreSQL®. This leverages NoSQLBench based workload generation and performance testing against any PostgreSQL-compatible database cluster. Example: CockroachDB® or YugabyteDB® (YSQL API).
# Executing JDBC Workload
The following is an example of invoking a JDBC workload.
```shell
<nb_cmd> run driver=jdbc workload="/path/to/workload.yaml" cycles=1000 threads=100 url="jdbc:postgresql://" serverName=localhost portNumber=5432 databaseName=defaultdb ... -vv --show-stacktraces
```
In the above NB command, following are JDBC driver specific parameters:
* `url`: URL of the database cluster. Default is `jdbc:postgresql://`.
* `serverName`: Default is `localhost`.
* `portNumber`: Default is `5432`.
* `serverName`: The database name. The default is to connect to a database with the same name as the user name used to connect to the server.
Other NB engine parameters are straight forward:
* `driver`: *must* be `jdbc`.
* `threads`: depending on the workload type, the NB thread number determines how many clients will be created. All the clients will share the Connection originated from the Hikari Connection Pool.
* `*.yaml`: the NB jdbc scenario definition workload yaml file.
* `<nb_cmd>`: is `./nb` (using binary) or the `java -jar nb5.jar`.
# Configuration
These are the main configurations with which we could issue a query and process the results back based on the [PostgreSQL® Query](https://jdbc.postgresql.org/documentation/query/) pattern.
## Config Sources
* `execute`: This is to issue any DDL statements such `CREATE DATABASE|TABLE` or `DROP DATABASE|TABLE` operations which returns nothing.
* `query`: This is to issue DML statement such as `SELECT` operation which would return a `ResultSet` object to process.
* `update`: This is to issue DML statements such as `INSERT|UPDATE|DELETE` operations that will return how many number of rows were affected by that operation.
## Statement Forms
The syntax for specifying these types is simplified as well, using only a single `type` field which allows values of `execute`, `query`, & `update`
and specifying the raw statements in the `stmt`. Alternatively, one could directly use one of the types and provide the raw query directly.
### Examples
Check out the default activities under the [activities.baselinesv2](./activities.baselinesv2) directory.
#### Op Template Examples
````yaml
ops:
drop-database:
type: execute
stmt: |
DROP DATABASE IF EXISTS TEMPLATE(database,baselines);
create-table:
execute: |
CREATE TABLE IF NOT EXISTS TEMPLATE(database,baselines).TEMPLATE(table,keyvalue);
select-table:
query: |
SELECT one, two, three FROM TEMPLATE(database,baselines).TEMPLATE(table,keyvalue) WHERE ...;
insert-table:
update: |
UPDATE TABLE TEMPLATE(database,baselines).TEMPLATE(table,keyvalue) SET key = 'value' WHERE ...;
````

View File

@@ -110,7 +110,6 @@
<module>adapter-kafka</module>
<module>adapter-amqp</module>
<module>adapter-jdbc</module>
<module>adapter-pgvector</module>
<module>adapter-pinecone</module>
<!-- VIRTDATA MODULES -->