diff --git a/adapter-jdbc/pom.xml b/adapter-jdbc/pom.xml index f2900f257..878396c1d 100644 --- a/adapter-jdbc/pom.xml +++ b/adapter-jdbc/pom.xml @@ -51,7 +51,7 @@ org.postgresql postgresql - 42.5.2 + 42.6.0 diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCOpMapper.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCOpMapper.java index d4d793f21..e307c36f7 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCOpMapper.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCOpMapper.java @@ -52,15 +52,6 @@ public class JDBCOpMapper implements OpMapper { String spaceName = op.getStaticConfigOr("space", "default"); JDBCSpace jdbcSpace = spaceCache.get(spaceName); - int nbThreadNum = NumberUtils.toInt(op.getStaticConfig("threads", String.class)); - int maxConnNum = jdbcSpace.getMaxNumConn(); - if (nbThreadNum > maxConnNum) { - throw new JDBCAdapterInvalidParamException( - "JDBC connection is NOT thread safe. The total NB thread number (" + nbThreadNum + - ") can NOT be greater than the maximum connection number 'num_conn' (" + maxConnNum + ")" - ); - } - /* * If the user provides a body element, then they want to provide the JSON or * a data structure that can be converted into JSON, bypassing any further diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCSpace.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCSpace.java index 058bea758..533626a3b 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCSpace.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCSpace.java @@ -20,8 +20,6 @@ import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariDataSource; import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException; import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException; -import io.nosqlbench.adapter.jdbc.utils.JDBCAdapterUtil; -import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector; import io.nosqlbench.api.config.standard.ConfigModel; import io.nosqlbench.api.config.standard.NBConfigModel; import io.nosqlbench.api.config.standard.NBConfiguration; @@ -33,12 +31,9 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.sql.Connection; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; public class JDBCSpace implements AutoCloseable { private final static Logger logger = LogManager.getLogger(JDBCSpace.class); @@ -48,59 +43,38 @@ public class JDBCSpace implements AutoCloseable { // NOTE: Since JDBC connection is NOT thread-safe, the total NB threads MUST be less // than or equal to this number. This is to make sure one thread per connection. private final static int DEFAULT_CONN_NUM = 5; - private final int maxNumConn; + private int maxNumConn = DEFAULT_CONN_NUM; - // For DML statements, how many statements to put together in one batch + // For DML write statements, how many statements to put together in one batch // - 1 : no batch (default) // - positive number: using batch private final static int DEFAULT_DML_BATCH_NUM = 1; - private final int dmlBatchNum; + private int dmlBatchNum = DEFAULT_DML_BATCH_NUM; - private final long totalCycleNum; - private static boolean isShuttingDown = false; + private long totalCycleNum; + private int totalThreadNum; + private boolean autoCommitCLI; - private HikariConfig hikariConfig; + private boolean useHikariCP; + private final HikariConfig connConfig = new HikariConfig(); private HikariDataSource hikariDataSource; - ConcurrentHashMap connections = new ConcurrentHashMap<>(); + // Maintain a client-side pooling just to make sure the allocated connections can + // be reclaimed quickly, instead of waiting for Hikari pooling to reclaim it eventually + public record ConnectionCacheKey(String connName) { + } + private final ConcurrentHashMap connections = new ConcurrentHashMap<>(); public JDBCSpace(String spaceName, NBConfiguration cfg) { this.spaceName = spaceName; - this.totalCycleNum = NumberUtils.toLong(cfg.getOptional("cycles").orElse("1")); - int totalThreads = NumberUtils.toInt(cfg.getOptional("threads").orElse("1")); - int numConnInput = NumberUtils.toInt(cfg.getOptional("num_conn").orElse("10")); - this.maxNumConn = Math.min(totalThreads, numConnInput); - if (this.maxNumConn < 1) { - throw new JDBCAdapterInvalidParamException( - "'num_conn' NB CLI parameter must be a positive number!" - ); - } - - // Must be after the 'maxNumConn' statements and before the rest of the remaining statements! this.initializeSpace(cfg); - this.dmlBatchNum = NumberUtils.toInt(cfg.getOptional("dml_batch").orElse("1")); - if (this.dmlBatchNum < 1) { - throw new JDBCAdapterInvalidParamException( - "'dml_batch' NB CLI parameter must be a positive number!" - ); - } - // According to JDBC spec, - // - The commit behavior of executeBatch is always implementation-defined - // when an error occurs and auto-commit is true. - // // In this adapter, we treat it as an error if 'autoCommit' is ON and using batch at the same time. - if ( (this.dmlBatchNum > 1) && (hikariConfig.isAutoCommit()) ) { + if ( (this.dmlBatchNum > 1) && isAutoCommit() ) { throw new JDBCAdapterInvalidParamException( "Using batch, 'dml_batch'(" + this.dmlBatchNum + ") > 1, along with 'autoCommit' ON is not supported!" ); } - - if (logger.isDebugEnabled()) { - logger.debug("{} JDBC connections will be created [max(threads/{}, num_conn/{}]; " + - "dml_batch: {}, autoCommit: {}", - maxNumConn, totalThreads, numConnInput, dmlBatchNum, hikariConfig.isAutoCommit()); - } } @Override @@ -108,62 +82,67 @@ public class JDBCSpace implements AutoCloseable { shutdownSpace(); } - public int getMaxNumConn() { return this.maxNumConn; } + public int getMaxNumConn() { return maxNumConn; } + public void setMaxNumConn(int i) { maxNumConn = i; } - public int getDmlBatchNum() { return this.dmlBatchNum; } + public int getDmlBatchNum() { return dmlBatchNum; } - public long getTotalCycleNum() { return this.totalCycleNum; } + public long getTotalCycleNum() { return totalCycleNum; } + public int getTotalThreadNum() { return totalThreadNum; } - public boolean isShuttingDown() { return isShuttingDown; } - public void enterShutdownStage() { isShuttingDown = true; } - - - public HikariDataSource getHikariDataSource() { - return this.hikariDataSource; + public boolean isAutoCommit() { + if (useHikariCP) + return connConfig.isAutoCommit(); + else + return this.autoCommitCLI; } + public boolean useHikariCP() { return useHikariCP; } + public HikariConfig getConnConfig() { return connConfig; } - public Connection getConnection(String connectionName) { - Connection connection = connections.get(connectionName); - if (connection == null) { - try { - connection = hikariDataSource.getConnection(); - if (logger.isDebugEnabled()) { - logger.debug("JDBC connection ({}) is successfully created: {}", - connectionName, connection); - } - // Register 'vector' type - JDBCPgVector.addVectorType(connection); + public HikariDataSource getHikariDataSource() { return hikariDataSource; } - connections.put(connectionName, connection); - } - catch (Exception ex) { - String exp = "Exception occurred while attempting to create a connection using the HikariDataSource"; - logger.error(exp, ex); - throw new JDBCAdapterUnexpectedException(exp); - } - } - - return connection; + public Connection getConnection(ConnectionCacheKey key, Supplier connectionSupplier) { + return connections.computeIfAbsent(key, __ -> connectionSupplier.get()); } private void initializeSpace(NBConfiguration cfg) { - hikariConfig = new HikariConfig(); + // + // NOTE: Although it looks like a good idea to use Hikari Connection Pooling + // But in my testing, it shows some strange behaviors such as + // 1) failed to allocate connection while the target server is completely working fine + // e.g. it failed consistently on a m5d.4xlarge testing bed but not on my mac. + // 2) doesn't really respect the 'max_connections' setting + // 3) it also appears to me that Hikari connection is slow + // + // Therefore, use `use_hikaricp` option to control whether to use Hikari connection pooling. When + // setting to 'false' (as default), it uses JDBC adapter's own (simple) connection management, with + // JDBC driver's `DriverManager` to create connection directly. + // + this.useHikariCP = BooleanUtils.toBoolean(cfg.getOptional("use_hikaricp").orElse("true")); + this.autoCommitCLI = BooleanUtils.toBoolean(cfg.getOptional("autoCommit").orElse("true")); + this.dmlBatchNum = NumberUtils.toInt(cfg.getOptional("dml_batch").orElse("1")); + if (this.dmlBatchNum < 0) dmlBatchNum = 1; + logger.info("CLI input parameters -- useHikariCP:{}, autoCommitCLI:{}, dmlBatchNum:{}", + useHikariCP, autoCommitCLI, dmlBatchNum); - hikariConfig.setJdbcUrl(cfg.get("url")); - hikariConfig.addDataSourceProperty("serverName", cfg.get("serverName")); + this.totalCycleNum = NumberUtils.toLong(cfg.getOptional("cycles").orElse("1")); + this.totalThreadNum = NumberUtils.toInt(cfg.getOptional("threads").orElse("1")); + + connConfig.setJdbcUrl(cfg.get("url")); + connConfig.addDataSourceProperty("serverName", cfg.get("serverName")); Optional databaseName = cfg.getOptional("databaseName"); if (databaseName.isPresent()) { - hikariConfig.addDataSourceProperty("databaseName", databaseName.get()); + connConfig.addDataSourceProperty("databaseName", databaseName.get()); } int portNumber = Integer.parseInt(cfg.get("portNumber")); - hikariConfig.addDataSourceProperty("portNumber", portNumber); + connConfig.addDataSourceProperty("portNumber", portNumber); Optional user = cfg.getOptional("user"); if (user.isPresent()) { - hikariConfig.setUsername(user.get()); + connConfig.setUsername(user.get()); } Optional password = cfg.getOptional("password"); @@ -171,7 +150,7 @@ public class JDBCSpace implements AutoCloseable { if (user.isEmpty()) { throw new OpConfigError("Both user and password options are required. Only password is supplied in this case."); } - hikariConfig.setPassword(password.get()); + connConfig.setPassword(password.get()); } else { if (user.isPresent()) { throw new OpConfigError("Both user and password options are required. Only user is supplied in this case."); @@ -179,77 +158,56 @@ public class JDBCSpace implements AutoCloseable { } Optional ssl = cfg.getOptional(Boolean.class, "ssl"); - hikariConfig.addDataSourceProperty("ssl", ssl.orElse(false)); + connConfig.addDataSourceProperty("ssl", ssl.orElse(false)); Optional sslMode = cfg.getOptional("sslmode"); if (sslMode.isPresent()) { - hikariConfig.addDataSourceProperty("sslmode", sslMode.get()); + connConfig.addDataSourceProperty("sslmode", sslMode.get()); } else { - hikariConfig.addDataSourceProperty("sslmode", "prefer"); + connConfig.addDataSourceProperty("sslmode", "prefer"); } Optional sslCert = cfg.getOptional("sslcert"); if (sslCert.isPresent()) { - hikariConfig.addDataSourceProperty("sslcert", sslCert.get()); + connConfig.addDataSourceProperty("sslcert", sslCert.get()); } /*else if(sslMode.isPresent() && (!"disable".equalsIgnoreCase(sslMode.get()) || !"allow".equalsIgnoreCase(sslMode.get())) || !"prefer".equalsIgnoreCase(sslMode.get())) { throw new OpConfigError("When sslmode is true, sslcert should be provided."); }*/ Optional sslRootCert = cfg.getOptional("sslrootcert"); if (sslRootCert.isPresent()) { - hikariConfig.addDataSourceProperty("sslrootcert", sslRootCert.get()); + connConfig.addDataSourceProperty("sslrootcert", sslRootCert.get()); } - hikariConfig.addDataSourceProperty("applicationName", cfg.get("applicationName")); - hikariConfig.addDataSourceProperty("rewriteBatchedInserts", cfg.getOrDefault("rewriteBatchedInserts", true)); + connConfig.addDataSourceProperty("applicationName", cfg.get("applicationName")); + connConfig.addDataSourceProperty("rewriteBatchedInserts", cfg.getOrDefault("rewriteBatchedInserts", true)); - // We're managing the auto-commit behavior of connections ourselves and hence disabling the auto-commit. - Optional autoCommitOpt = cfg.getOptional("autoCommit"); - boolean autoCommit = false; - if (autoCommitOpt.isPresent()) autoCommit = BooleanUtils.toBoolean(autoCommitOpt.get()); - hikariConfig.setAutoCommit(autoCommit); - - hikariConfig.setKeepaliveTime(Integer.parseInt(cfg.get("keepaliveTime"))); - - // HikariCP "maximumPoolSize" parameter is ignored. + connConfig.setKeepaliveTime(Integer.parseInt(cfg.get("keepaliveTime"))); // Use the NB "num_conn" parameter instead, wth 20% extra capacity - hikariConfig.setMaximumPoolSize((int)Math.ceil(1.2*maxNumConn)); + connConfig.setMaximumPoolSize((int) Math.ceil(1.2 * maxNumConn)); - this.hikariDataSource = new HikariDataSource(hikariConfig); + if (useHikariCP) { + this.hikariDataSource = new HikariDataSource(connConfig); + logger.info("hikariDataSource is created : {}", hikariDataSource); + } } private void shutdownSpace() { - isShuttingDown = true; - try { - waitUntilAllOpFinished(System.currentTimeMillis()); - + logger.info("Shutting down JDBCSpace -- total {} of connections is being closed ...", connections.size()); for (Connection connection : connections.values()) { + if (logger.isDebugEnabled()) { + logger.debug("Close connection : {}", connection); + } connection.close(); } } catch (Exception e) { throw new JDBCAdapterUnexpectedException("Unexpected error when trying to close the JDBC connection!"); } - hikariDataSource.close(); - } - - private void waitUntilAllOpFinished(long shutdownStartTimeMills) { - final int timeToWaitInSec = 5; - long timeElapsedMills; - boolean continueChk; - - do { - JDBCAdapterUtil.pauseCurThreadExec(1); - - long curTimeMills = System.currentTimeMillis(); - timeElapsedMills = curTimeMills - shutdownStartTimeMills; - continueChk = (timeElapsedMills <= (timeToWaitInSec*1000)); - } while (continueChk); - - logger.info( - "shutdownSpace::waitUntilAllOpFinished -- " + - "shutdown time elapsed: " + timeElapsedMills + "ms."); + if (hikariDataSource != null) { + hikariDataSource.close(); + } } public static NBConfigModel getConfigModel() { @@ -259,8 +217,8 @@ public class JDBCSpace implements AutoCloseable { .add(Param.defaultTo("dml_batch", DEFAULT_DML_BATCH_NUM) .setDescription("The number of DML write statements in a batch. Defaults to 1. Ignored by DML read statements!" + DEFAULT_DML_BATCH_NUM + "' (no batch)")) - .add(Param.defaultTo("url", "jdbc:postgresql:/") - .setDescription("The connection URL used to connect to the DBMS. Defaults to 'jdbc:postgresql:/'")) + .add(Param.defaultTo("use_hikaricp", "true") + .setDescription("Whether to use Hikari connection pooling (default: true)!")) .add(Param.defaultTo("url", "jdbc:postgresql:/") .setDescription("The connection URL used to connect to the DBMS. Defaults to 'jdbc:postgresql:/'")) .add(Param.defaultTo("serverName", "localhost") diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/opdispensers/JDBCBaseOpDispenser.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/opdispensers/JDBCBaseOpDispenser.java index e5c520bbc..9abfe45bc 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/opdispensers/JDBCBaseOpDispenser.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/opdispensers/JDBCBaseOpDispenser.java @@ -17,10 +17,12 @@ package io.nosqlbench.adapter.jdbc.opdispensers; import io.nosqlbench.adapter.jdbc.JDBCSpace; +import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException; import io.nosqlbench.adapter.jdbc.optypes.JDBCOp; import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser; import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.apache.commons.lang3.math.NumberUtils; public abstract class JDBCBaseOpDispenser extends BaseOpDispenser { protected static final String ERROR_STATEMENT_CREATION = @@ -39,9 +41,4 @@ public abstract class JDBCBaseOpDispenser extends BaseOpDispenser numConnInput) { + throw new JDBCAdapterInvalidParamException( + "JDBC connection is NOT thread safe. For write workload, the total NB thread number (" + threadNum + + ") can NOT be greater than the maximum connection number 'num_conn' (" + numConnInput + ")" + ); + } + } + maxNumConnFinal = Math.min(threadNum, maxNumConnFinal); + if (maxNumConnFinal < 1) { + throw new JDBCAdapterInvalidParamException( + "'num_conn' NB CLI parameter must be a positive number!" + ); + } + jdbcSpace.setMaxNumConn(maxNumConnFinal); + + logger.info("Total {} JDBC connections will be created [isReadStmt:{}, threads/{}, num_conn/{}]; " + + "dml_batch: {}, autoCommit: {}", + maxNumConnFinal, isReadStmt, threadNum, numConnInput, + jdbcSpace.getDmlBatchNum(), jdbcSpace.isAutoCommit()); + + // TODO: this is a current limitation applied by this adapter + // improve this behavior by allowing the user to choose if (!isPreparedStatement && !isReadStatement) { throw new JDBCAdapterInvalidParamException("DML write statements MUST be prepared!"); } @@ -69,8 +102,6 @@ public class JDBCDMLOpDispenser extends JDBCBaseOpDispenser { @Override public JDBCDMLOp apply(long cycle) { - checkShutdownEntry(cycle); - if (isReadStatement) { return new JDBCDMLReadOp( jdbcSpace, diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDDLOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDDLOp.java index 2577ad91c..a5330e743 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDDLOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDDLOp.java @@ -33,20 +33,11 @@ public class JDBCDDLOp extends JDBCOp { this.ddlStmtStr = ddlStmtStr; } - private Statement createDDLStatement() { - try { - return jdbcConnection.createStatement(); - } catch (SQLException e) { - throw new JDBCAdapterUnexpectedException( - "Unable to create a regular (non-prepared) JDBC statement"); - } - } @Override public Object apply(long value) { try { - Statement stmt = createDDLStatement(); + Statement stmt = jdbcConnection.createStatement(); stmt.execute(ddlStmtStr); - closeStatement(stmt); return true; } catch (SQLException sqlException) { throw new JDBCAdapterUnexpectedException( diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java index a44e0d1c7..a7fb0ce80 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java @@ -18,7 +18,6 @@ package io.nosqlbench.adapter.jdbc.optypes; import io.nosqlbench.adapter.jdbc.JDBCSpace; import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException; import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException; -import io.nosqlbench.adapter.jdbc.exceptions.JDBCPgVectorException; import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector; import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; @@ -73,12 +72,12 @@ public abstract class JDBCDMLOp extends JDBCOp { } // Only applicable to a prepared statement - protected PreparedStatement setPrepStmtValues(PreparedStatement stmt, List valList) { + protected PreparedStatement setPrepStmtValues(PreparedStatement stmt) throws SQLException { assert (stmt != null); - for (int i=0; i LOG_COMMIT_SUCCESS); - } - } catch (SQLException e) { - throw new JDBCAdapterUnexpectedException("Failed to process JDBC statement commit!"); + + protected void processCommit() throws SQLException { + if (!jdbcConnection.getAutoCommit()) { + jdbcConnection.commit(); + LOGGER.debug(() -> LOG_COMMIT_SUCCESS); } } - protected Statement createDMLStatement() { + protected Statement createDMLStatement() throws SQLException { Statement stmt = jdbcStmtTL.get(); + if (stmt == null) { + if (isPreparedStmt) + stmt = jdbcConnection.prepareStatement(pStmtSqlStr); + else + stmt = jdbcConnection.createStatement(); - try { - if (stmt == null) { - if (isPreparedStmt) - stmt = jdbcConnection.prepareStatement(pStmtSqlStr); - else - stmt = jdbcConnection.createStatement(); + jdbcStmtTL.set(stmt); - jdbcStmtTL.set(stmt); - - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("A statement is created -- prepared: {}, read/write: {}, stmt: {}", - isPreparedStmt, - isReadStmt ? "read" : "write", - stmt); - } + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("A statement is created -- prepared: {}, read/write: {}, stmt: {}", + isPreparedStmt, + isReadStmt ? "read" : "write", + stmt); } - - return stmt; - } catch (SQLException e) { - throw new JDBCAdapterUnexpectedException( - "Unable to create a prepared JDBC statement"); } + return stmt; } } diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLReadOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLReadOp.java index e1d303f79..a61e02002 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLReadOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLReadOp.java @@ -41,12 +41,12 @@ public class JDBCDMLReadOp extends JDBCDMLOp { @Override public Object apply(long value) { - Statement stmt = super.createDMLStatement(); - if (isPreparedStmt) { - stmt = super.setPrepStmtValues((PreparedStatement) stmt, this.pStmtValList); - } + try { + Statement stmt = super.createDMLStatement(); + if (isPreparedStmt) { + stmt = setPrepStmtValues((PreparedStatement) stmt); + } - try { // key string list to be used in the "Vector" relevancy score verification List verifierValueList = new ArrayList<>(); @@ -59,7 +59,6 @@ public class JDBCDMLReadOp extends JDBCDMLOp { verifierValueList.add(keyVal); } } while (rs.next()); - closeStatement(stmt); } else { boolean isResultSet = ((PreparedStatement)stmt).execute(); @@ -82,8 +81,6 @@ public class JDBCDMLReadOp extends JDBCDMLOp { } isResultSet = stmt.getMoreResults(); } - - closeStatement(stmt); } return verifierValueList; diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLWriteOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLWriteOp.java index adfa7862d..68999789c 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLWriteOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLWriteOp.java @@ -20,6 +20,7 @@ import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; @@ -44,24 +45,22 @@ public class JDBCDMLWriteOp extends JDBCDMLOp { @Override public Object apply(long value) { int trackingCnt = threadBatchTrackingCntTL.get(); - trackingCnt = trackingCnt + 1; + trackingCnt++; threadBatchTrackingCntTL.set(trackingCnt); - PreparedStatement stmt = (PreparedStatement) super.createDMLStatement(); - stmt = super.setPrepStmtValues(stmt, this.pStmtValList); - try { + assert (isPreparedStmt); + PreparedStatement stmt = (PreparedStatement) super.createDMLStatement(); + stmt = super.setPrepStmtValues(stmt); + // No batch if (ddlStmtBatchNum == 1) { int result_cnt = stmt.executeUpdate(); super.processCommit(); - closeStatement(stmt); - if (LOGGER.isDebugEnabled()) { LOGGER.debug("[single ddl - execution] cycle:{}, result_cnt: {}, stmt: {}", value, result_cnt, stmt); } - return result_cnt; } // Use batch @@ -71,12 +70,12 @@ public class JDBCDMLWriteOp extends JDBCDMLOp { LOGGER.debug("[batch ddl - adding to batch] cycle:{}, stmt: {}", value, stmt); } - - if ( (trackingCnt % ddlStmtBatchNum == 0) || jdbcSpace.isShuttingDown() ) { + // NOTE: if the total number of cycles is not the multiple of the batch number, + // some records in a batch may not be written to the database + // To avoid this, make sure the total cycle number is the multiple of the batch number + if (trackingCnt % ddlStmtBatchNum == 0) { int[] counts = stmt.executeBatch(); processCommit(); - closeStatement(stmt); - if (LOGGER.isDebugEnabled()) { LOGGER.debug("[batch ddl - execution] cycle:{}, total_batch_res_cnt:{}, stmt: {}", value, counts, stmt); diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java index 38e1189af..11d06fe9c 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java @@ -18,14 +18,14 @@ package io.nosqlbench.adapter.jdbc.optypes; import io.nosqlbench.adapter.jdbc.JDBCSpace; import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException; +import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector; import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; +import java.util.Properties; +import java.util.Random; public abstract class JDBCOp implements CycleOp { private static final Logger LOGGER = LogManager.getLogger(JDBCOp.class); @@ -34,18 +34,50 @@ public abstract class JDBCOp implements CycleOp { protected final JDBCSpace jdbcSpace; protected final Connection jdbcConnection; + private final Random random = new Random(); public JDBCOp(JDBCSpace jdbcSpace) { this.jdbcSpace = jdbcSpace; - String curThreadName = Thread.currentThread().getName(); - this.jdbcConnection = this.jdbcSpace.getConnection(curThreadName); + this.jdbcConnection = getConnection(); } - protected void closeStatement(Statement stmt) throws SQLException { - if (! (stmt instanceof PreparedStatement)) { - stmt.close(); - } else if (jdbcSpace.isShuttingDown()) { - stmt.close(); - } + private Connection getConnection() { + int rnd = random.nextInt(0, jdbcSpace.getMaxNumConn()); + final String connectionName = "jdbc-conn-" + rnd; + + return jdbcSpace.getConnection( + new JDBCSpace.ConnectionCacheKey(connectionName), () -> { + try { + Connection connection; + + if (jdbcSpace.useHikariCP()) { + connection = jdbcSpace.getHikariDataSource().getConnection(); + } + // Use DriverManager directly + else { + String url = jdbcSpace.getConnConfig().getJdbcUrl(); + Properties props = jdbcSpace.getConnConfig().getDataSourceProperties(); + props.put("user", jdbcSpace.getConnConfig().getUsername()); + props.put("password", jdbcSpace.getConnConfig().getPassword()); + connection = DriverManager.getConnection(url, props); + } + + // Register 'vector' type + JDBCPgVector.addVectorType(connection); + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("A new JDBC connection ({}) is successfully created: {}", + connectionName, connection); + } + + return connection; + } + catch (Exception ex) { + String exp = "Exception occurred while attempting to create a connection (useHikariCP=" + + jdbcSpace.useHikariCP() + ")"; + LOGGER.error(exp, ex); + throw new JDBCAdapterUnexpectedException(exp); + } + }); } } diff --git a/adapter-jdbc/src/main/resources/activities.baselinesv2/cockroachdb-keyvalue.yaml b/adapter-jdbc/src/main/resources/activities.baselinesv2/cockroachdb-keyvalue.yaml deleted file mode 100644 index c9b16db5f..000000000 --- a/adapter-jdbc/src/main/resources/activities.baselinesv2/cockroachdb-keyvalue.yaml +++ /dev/null @@ -1,62 +0,0 @@ -# run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags="block:schema" threads=AUTO cycles=4 url="jdbc:postgresql://host:port/database" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName=insectdb sslrootcert="/path/to/postgresql_certs/root.crt" -vv --show-stacktraces -min_version: "5.17.2" - -description: | - A workload with only text keys and text values. This is based on the CQL keyvalue workloads as found - in cql-keyvalue2.yaml. - -scenarios: - default: - schema: run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags==block:schema threads=1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" - rampup: run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags==block:rampup threads=AUTO cycles===TEMPLATE(rampup-cycles,100) url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" - main: run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags==block:'main.*' threads=AUTO cycles===TEMPLATE(main-cycles,100) url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" - -params: - instrument: TEMPLATE(instrument,false) - -bindings: - seq_key: Mod(TEMPLATE(keycount,1000000000)); ToString() -> String - seq_value: Hash(); Mod(TEMPLATE(valuecount,1000000000)); ToString() -> String - rw_key: <int>>; ToString() -> String - rw_value: Hash(); <int>>; ToString() -> String - -blocks: - schema: - ops: - drop-database: - execute: | - DROP DATABASE IF EXISTS TEMPLATE(database,baselines); - create-database: - execute: | - CREATE DATABASE IF NOT EXISTS TEMPLATE(database,baselines); - drop-table: - execute: | - DROP TABLE IF EXISTS TEMPLATE(database,baselines).TEMPLATE(table,keyvalue); - create-table: - execute: | - CREATE TABLE IF NOT EXISTS TEMPLATE(database,baselines).TEMPLATE(table,keyvalue) - (key STRING PRIMARY KEY, value STRING); - - rampup: - params: - ops: - rampup-insert: - update: | - INSERT INTO TEMPLATE(database,baselines).TEMPLATE(table,keyvalue) - (key, value) VALUES ({seq_key},{seq_value}); - - main-read: - params: - ratio: TEMPLATE(read_ratio,5) - ops: - main-select: - query: | - SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue) WHERE key='{rw_key}'; - main-write: - params: - ratio: TEMPLATE(write_ratio,5) - ops: - main-insert: - update: | - INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue) - (key, value) VALUES ('{rw_key}', '{rw_value}'); diff --git a/adapter-jdbc/src/main/resources/activities.baselinesv2/cockroachdb-tabular.yaml b/adapter-jdbc/src/main/resources/activities.baselinesv2/cockroachdb-tabular.yaml deleted file mode 100644 index 052f72071..000000000 --- a/adapter-jdbc/src/main/resources/activities.baselinesv2/cockroachdb-tabular.yaml +++ /dev/null @@ -1,157 +0,0 @@ -min_version: "5.17.2" - -description: | - A tabular workload with partitions, clusters, and data fields - This workload contains partitioning and cluster along with a set - of 8 fields of varying length. The field values vary in size according - to the fibonacci sequence times a base size factor of 10, with - an additional 10% variance for each field. - The read patterns have a variety of field subsets specified. - - During rampup, all rows will be written partition by partition, - filling in all rows of that partition before moving on to the next. - Example: With a partition size of 1000 and 1B rows, there will be - 1000000 partitions. - - During main phase, the read patterns are varied with different - field sets. As well, the number of rows which will be returned - is varied between 1 and 10. - - By default, reads occur at the same ratio as writes, with main - phase writes writing full rows. - - You can bulk up the size of the payloads by 10x with addzeroes='0', - by 100x with addzeroes='00', and so on, but if you want to go higher - than 100x, you'll need to modify the workload with a larger reference - file in the HashedFileExtractToString(...) bindings. - -scenarios: - default: - schema: run driver=jdbc tags==block:schema cycles==UNDEF threads==1 - rampup: run driver=jdbc tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto - main: run driver=jdbc tags==block:'main.*' cycles===TEMPLATE(main-cycles,100) threads=auto - -params: - instrument: TEMPLATE(instrument,false) - -bindings: - # for ramp-up and verify phases - # - part_layout: Div(<>); ToString() -> String - clust_layout: Mod(<>); ToString() -> String - # todo: update these definitions to use the simpler 10,0.1, 20, 0.2, ... - data0: Add(10); HashedFileExtractToString('data/lorem_ipsum_full.txt',9TEMPLATE(addzeroes,),11TEMPLATE(addzeroes,)) - data1: Add(20); HashedFileExtractToString('data/lorem_ipsum_full.txt',18TEMPLATE(addzeroes,),22TEMPLATE(addzeroes,)) - data2: Add(30); HashedFileExtractToString('data/lorem_ipsum_full.txt',27TEMPLATE(addzeroes,),33TEMPLATE(addzeroes,)) - data3: Add(40); HashedFileExtractToString('data/lorem_ipsum_full.txt',45TEMPLATE(addzeroes,),55TEMPLATE(addzeroes,)) - data4: Add(50); HashedFileExtractToString('data/lorem_ipsum_full.txt',72TEMPLATE(addzeroes,),88TEMPLATE(addzeroes,)) - data5: Add(60); HashedFileExtractToString('data/lorem_ipsum_full.txt',107TEMPLATE(addzeroes,),143TEMPLATE(addzeroes,)) - data6: Add(70); HashedFileExtractToString('data/lorem_ipsum_full.txt',189TEMPLATE(addzeroes,),231TEMPLATE(addzeroes,)) - data7: Add(80); HashedFileExtractToString('data/lorem_ipsum_full.txt',306TEMPLATE(addzeroes,),374TEMPLATE(addzeroes,)) - - # for main phase - # for write - part_write: Hash(); Uniform(0,TEMPLATE(partcount,100))->int; ToString() -> String - clust_write: Hash(); Add(1); Uniform(0,TEMPLATE(partsize,1000000))->int; ToString() -> String - data_write: Hash(); HashedFileExtractToString('data/lorem_ipsum_full.txt',50,150) -> String - - # for read - limit: Uniform(1,10) -> int - part_read: Uniform(0,TEMPLATE(partcount,100))->int; ToString() -> String - clust_read: Add(1); Uniform(0,TEMPLATE(partsize,1000000))->int; ToString() -> String - -blocks: - schema: - params: - prepared: false - ops: - #drop-database: - # execute: | - # DROP DATABASE IF EXISTS TEMPLATE(database,baselines); - create-database: - execute: | - CREATE DATABASE IF NOT EXISTS TEMPLATE(database,baselines); - drop-table: - execute: | - DROP TABLE IF EXISTS TEMPLATE(database,baselines).TEMPLATE(table,tabular); - create-table: - execute: | - CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) ( - part STRING, - clust STRING, - data0 STRING, data1 STRING, data2 STRING, data3 STRING, - data4 STRING, data5 STRING, data6 STRING, data7 STRING, - PRIMARY KEY (part,clust) - ); - - rampup: - params: - ops: - rampup-insert: - update: | - INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - (part,clust,data0,data1,data2,data3,data4,data5,data6,data7) - VALUES ( - '{part_layout}','{clust_layout}','{data0}','{data1}','{data2}', - '{data3}','{data4}','{data5}','{data6}','{data7}' - ); - - verify: - params: - cl: TEMPLATE(read_cl,LOCAL_QUORUM) - ops: - verify-select: - query: | - SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - WHERE part='{part_layout}' - AND clust='{clust_layout}' - - main-read: - params: - ratio: TEMPLATE(read_ratio,1) - ops: - main-select-all: - query: | - SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - WHERE part='{part_read}' LIMIT {limit}; - main-select-01: - query: | - SELECT data0,data1 from TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - WHERE part='{part_read}' LIMIT {limit}; - main-select-0246: - query: | - SELECT data0,data2,data4,data6 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - WHERE part='{part_read}' LIMIT {limit}; - main-select-1357: - query: | - SELECT data1,data3,data5,data7 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - WHERE part='{part_read}' LIMIT {limit}; - main-select-0123: - query: | - SELECT data0,data1,data2,data3 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - WHERE part='{part_read}' LIMIT {limit}; - main-select-4567: - query: | - SELECT data4,data5,data6,data7 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - WHERE part='{part_read}' LIMIT {limit}; - main-select-67: - query: | - SELECT data6,data7 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - WHERE part='{part_read}' LIMIT {limit}; - main-select: - query: | - SELECT data0,data1,data2,data3,data4,data5,data6,data7 - FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - WHERE part='{part_read}' LIMIT {limit}; - main-write: - params: - ratio: TEMPLATE(write_ratio,8) - ops: - main-write: - update: | - INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) - (part, clust, data0,data1,data2,data3,data4,data5,data6,data7) - VALUES ( - '{part_write}','{clust_write}','{data0}','{data1}','{data2}', - '{data3}','{data4}','{data5}','{data6}','{data7}' - ); diff --git a/adapter-jdbc/src/main/resources/activities.baselinesv2/cockroachdb-timeseries.yaml b/adapter-jdbc/src/main/resources/activities.baselinesv2/cockroachdb-timeseries.yaml deleted file mode 100644 index 053ef2cb4..000000000 --- a/adapter-jdbc/src/main/resources/activities.baselinesv2/cockroachdb-timeseries.yaml +++ /dev/null @@ -1,85 +0,0 @@ -# jara -jar nb5.jar cockroachdb-timeseries default -vv --show-stacktraces -min_version: "5.17.2" - -description: | - This workload emulates a time-series data model and access patterns. - -scenarios: - default: - schema: run driver=jdbc tags==block:schema cycles==UNDEF threads==1 url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" - rampup: run driver=jdbc tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" - main: run driver=jdbc tags==block:'main.*' cycles===TEMPLATE(main-cycles,100) threads=auto url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" - -params: - instrument: TEMPLATE(instrument,false) - -bindings: - machine_id: Mod(TEMPLATE(sources,10000)); ToHashedUUID() -> java.util.UUID - sensor_name: HashedLineToString('data/variable_words.txt') - time: Mul(TEMPLATE(timespeed,100)L); Div(TEMPLATE(sources,10000)L); ToJavaInstant() - cell_timestamp: Mul(TEMPLATE(timespeed,100)L); Div(TEMPLATE(sources,10000)L); Mul(1000L) - sensor_value: Normal(0.0,5.0); Add(100.0) -> double - station_id: Div(TEMPLATE(sources,10000));Mod(TEMPLATE(stations,100)); ToHashedUUID() -> java.util.UUID - data: HashedFileExtractToString('data/lorem_ipsum_full.txt',800TEMPLATE(addzeroes,),1200TEMPLATE(addzeroes,)) - -blocks: - schema: - params: - ops: - drop-database: - #execute: | - # DROP DATABASE IF EXISTS TEMPLATE(database,baselines); - create-database: - execute: | - CREATE DATABASE IF NOT EXISTS TEMPLATE(database,baselines); - drop-table: - execute: | - DROP TABLE IF EXISTS TEMPLATE(database,baselines).TEMPLATE(table,iot); - create-table: - execute: | - CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,iot) ( - machine_id UUID, - sensor_name STRING, - time TIMESTAMP, - sensor_value FLOAT, - station_id UUID, - data STRING, - PRIMARY KEY (machine_id ASC, sensor_name ASC), - INDEX TEMPLATE(table,iot)_time_desc_idx (time DESC) - ); - - rampup: - params: - ops: - insert-rampup: - update: | - INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,iot) - (machine_id, sensor_name, time, sensor_value, station_id, data) - VALUES ( - '{machine_id}', '{sensor_name}', '{time}', {sensor_value}, '{station_id}', '{data}' - ); - - #using timestamp {cell_timestamp} - - main-read: - params: - ratio: TEMPLATE(read_ratio,1) - ops: - select-read: - query: | - SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,iot) - WHERE machine_id='{machine_id}' and sensor_name='{sensor_name}' - LIMIT TEMPLATE(limit,10); - main-write: - params: - ratio: TEMPLATE(write_ratio,9) - ops: - insert-main: - update: | - INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,iot) - (machine_id, sensor_name, time, sensor_value, station_id, data) - VALUES ( - '{machine_id}', '{sensor_name}', '{time}', {sensor_value}, '{station_id}', '{data}' - ); - - #using timestamp {cell_timestamp} diff --git a/adapter-jdbc/src/main/resources/activities.baselinesv2/pgvector.yaml b/adapter-jdbc/src/main/resources/activities.baselinesv2/pgvector.yaml new file mode 100644 index 000000000..f83b2e34f --- /dev/null +++ b/adapter-jdbc/src/main/resources/activities.baselinesv2/pgvector.yaml @@ -0,0 +1,100 @@ +# run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags="block:schema" threads=AUTO cycles=4 url="jdbc:postgresql://host:port/database" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName=insectdb sslrootcert="/path/to/postgresql_certs/root.crt" -vv --show-stacktraces +min_version: "5.17.2" + +description: | + A workload which reads ann-benchmarks vector data from HDF5 file format for PostgreSql with baselinetor. + +scenarios: + default: + # supabase environment + drop: run driver=jdbc tags==block:drop threads===1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" + schema: run driver=jdbc tags==block:schema threads===1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" + testann: run driver=jdbc tags==block:testann threads=AUTO cycles===TEMPLATE(main-cycles,1000) url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" + train: run driver=jdbc tags==block:train threads=AUTO cycles===TEMPLATE(trainsize) dml_batch=120 autoCommit=false url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" + +bindings: + rw_key: ToString(); + train_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datasetfile).hdf5", "/train"); ToCqlVector(); + test_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datasetfile).hdf5", "/test"); ToCqlVector(); + relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(datasetfile).hdf5", "/neighbors") + +blocks: + drop: + ops: + drop_vector_index: + ddl: | + DROP INDEX IF EXISTS idx_TEMPLATE(tablename,baseline)_TEMPLATE(indextype)_TEMPLATE(similarity_function); + drop_table: + ddl: | + DROP TABLE IF EXISTS TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline); + ## + # NOTE: Do NOT enable this block for 'runall.sh' script + # -------------------------------------------------- + # drop_schema: + # ddl: | + # DROP SCHEMA IF EXISTS TEMPLATE(schemaname,public); + + schema: + ops: + create_schema: + ddl: | + CREATE SCHEMA IF NOT EXISTS TEMPLATE(schemaname,public); + create_table: + ddl: | + CREATE TABLE IF NOT EXISTS TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline) + (key TEXT PRIMARY KEY, value vector(TEMPLATE(dimensions,5))); + create_vector_index: + ddl: | + CREATE INDEX IF NOT EXISTS idx_TEMPLATE(tablename,baseline)_TEMPLATE(indextype)_TEMPLATE(similarity_function) + ON TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline) + USING TEMPLATE(indextype) (value vector_TEMPLATE(similarity_function)_ops) + WITH (TEMPLATE(indexopt)); + + train: + params: + prepared: true + ops: + main_insert: + dmlwrite: | + INSERT INTO TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline) VALUES (?,?) + ON CONFLICT DO NOTHING; + prep_stmt_val_arr: | + {rw_key},{train_floatlist} + + testann: + params: + prepared: true + ops: + # NOTE: right now this is only for cosine similarity. + # in baselinetor, '<=>' is for cosine similarity + # '<->' is for euclidean distance + # '<#>' is for inner product + main_select: + dmlread: | + SELECT * + FROM TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline) + ORDER BY value <=> ? + LIMIT TEMPLATE(top_k,100); + prep_stmt_val_arr: | + {test_floatlist} + ################################# + ## NOTE: + # 1). The script blocks below are ONLY relevant with Vector relevancy score verification + # 2). The "verifier-key" must match the Vector data identifier column name (e.g. primary key name) + # right now the identifier must be a type that can be converted to int. + verifier-key: "key" + verifier-init: | + relevancy=scriptingmetrics.newRelevancyMeasures(_parsed_op); + k=TEMPLATE(top_k,100) + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k)); + relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k)); + verifier: | + // driver-specific function + actual_indices=pgvec_utils.getValueListForVerifierKey(result); + // driver-agnostic function + relevancy.accept({relevant_indices},actual_indices); + // because we are "verifying" although this needs to be reorganized + return true; diff --git a/adapter-jdbc/src/main/resources/activities.baselinesv2/postgres-pgvector-relevancy.yaml b/adapter-jdbc/src/main/resources/activities.baselinesv2/postgres-pgvector-relevancy.yaml deleted file mode 100644 index 3bd89398d..000000000 --- a/adapter-jdbc/src/main/resources/activities.baselinesv2/postgres-pgvector-relevancy.yaml +++ /dev/null @@ -1,152 +0,0 @@ -# run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags="block:schema" threads=AUTO cycles=4 url="jdbc:postgresql://host:port/database" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName=insectdb sslrootcert="/path/to/postgresql_certs/root.crt" -vv --show-stacktraces -min_version: "5.17.2" - -description: | - A workload which reads ann-benchmarks vector data from HDF5 file format for PostgreSql with pgvector. - -scenarios: - default: - #### - # The following CLI parameters are required for all named scenarios: - # - schema: schema name - # - table: table name - #### - - ### - ## For DDL workload, turn on 'AutoCommit'. Turning it off will cause errors. - ### - drop-tbl: run driver=jdbc tags==block:drop-tbl threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true" - # The following CLI parameters is needed for 'create-tbl' named scenario: - # - dimensions: vector dimension size (MUST match the actual ANN benchmark data) - create-tbl: run driver=jdbc tags==block:create-tbl threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true" - # - # Vectors with up to 2,000 dimensions can be indexed. - # - # The following extra CLI parameter is needed for both 'create-vec-idx' and 'drop-vec-idx' named scenarios: - # - indexName: index name - drop-vec-idx: run driver=jdbc tags==block:drop-vec-idx threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true" - # The following extra CLI parameters are needed for 'create-vec-idx' named scenario: - # - indexType: index type; valid values: 'ivfflat' or 'hnsw' (see: https://github.com/pgvector/pgvector#indexing) - # - indexOpt: index options - # * for 'ivfflat' index type, the option is like: "lists=" - # * for 'hnsw' index type, the option is like: "m=,ef_construction =" - # - relFunc: relevancy function; valid values: 'l2' (L2 distance), 'ip' (Inner product), or 'cosine' (Cosine distance) - create-vec-idx: run driver=jdbc tags==block:create-vec-idx threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true" - - ### - ## For DML workload, 'AutoCommit' can be off or on - ## - MUST be off when batching is used !! - ### - # The following extra CLI parameters is needed for both 'vec-read' and 'vec-write' named scenarios: - # - dataset: ANN benchmark testing dataset name - # - # The following extra CLI parameters is needed for 'vec-read' named scenario: - # - queryLimit: the number of the records to be returned as in 'LIMIT ' clause - vec-read: run driver=jdbc tags==block:vec-read cycles===TEMPLATE(main-cycles,100) threads=AUTO url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true" - # The following extra CLI parameters is needed for 'vec-write' named scenario: - # - trainsize: the number of records to load from the dataset and insert into the table - vec-write: run driver=jdbc tags==block:vec-write cycles===TEMPLATE(trainsize) threads=AUTO url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" - -bindings: - rw_key: ToString() - train_vector: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/train"); - test_vector: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/test"); - validation_set: HdfFileToIntArray("testdata/TEMPLATE(dataset).hdf5", "/neighbors"); - -blocks: - drop-tbl: - params: - # DDL statement must NOT be prepared - prepared: false - ops: - drop-table: - ddl: | - DROP TABLE IF EXISTS TEMPLATE(schema,public).TEMPLATE(table,pgvec); - drop-schema: - ddl: | - DROP SCHEMA IF EXISTS TEMPLATE(schema,public); - - create-tbl: - params: - # DDL statement must NOT be prepared - prepared: false - ops: - create-schema: - ddl: | - CREATE SCHEMA IF NOT EXISTS TEMPLATE(schema,public); - create-table: - ddl: | - CREATE TABLE IF NOT EXISTS TEMPLATE(schema,public).TEMPLATE(table,pgvec) - (key TEXT PRIMARY KEY, value vector(<>)); - - drop-vec-idx: - params: - # DDL statement must NOT be prepared - prepared: false - ops: - drop-vector-index: - ddl: | - DROP INDEX IF EXISTS TEMPLATE(schema,public).TEMPLATE(indexName,TEMPLATE(dataset)-idx); - - create-vec-idx: - params: - # DDL statement must NOT be prepared - prepared: false - ops: - create-vector-index: - ddl: | - CREATE INDEX IF NOT EXISTS TEMPLATE(indexName,TEMPLATE(dataset)-idx-TEMPLATE(indexType)) - ON TEMPLATE(schema,public).TEMPLATE(table,pgvec) - USING TEMPLATE(indexType) (value vector_TEMPLATE(relFunc)_ops) - WITH (TEMPLATE(indexOpt)); - - # Using PostgreSQl upsert (INSERT ON CONFLICT statement) - vec-write: - params: - # DML write statement MUST be prepared - prepared: true - ops: - main-insert: - dmlwrite: | - INSERT INTO TEMPLATE(schema,public).TEMPLATE(table,pgvec) VALUES (?,?) - ON CONFLICT DO NOTHING; - prep_stmt_val_arr: | - {rw_key},{test_vector} - - vec-read: - ops: - params: - # DML READ statement can be prepared or not - prepared: true - main-select: - dmlread: | - SELECT key, (value <-> ?) as relevancy, value - FROM TEMPLATE(schema,public).TEMPLATE(table,pgvec) - ORDER BY value <-> ? - LIMIT TEMPLATE(queryLimit,100); - prep_stmt_val_arr: | - {test_vector},{test_vector} - ################################# - ## NOTE: - # 1). The script blocks below are ONLY relevant with Vector relevancy score verification - # 2). The "verifier-key" must match the Vector data identifier column name (e.g. primary key name) - # right now the identifier must be a type that can be converted to int. - verifier-key: "key" - verifier-imports: - - io.nosqlbench.adapter.mongodb.MongoDbUtils - verifier-init: | - relevancy=scriptingmetrics.newRelevancyMeasures(_parsed_op); - for (int k in List.of(100)) { - relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k)); - relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k)); - relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k)); - relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k)); - relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k)); - } - verifier: | - // driver-specific function - actual_indices=pgvec_utils.getValueListForVerifierKey(result); - // driver-agnostic function - relevancy.accept({validation_set},actual_indices); - // because we are "verifying" although this needs to be reorganized - return true;