mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge pull request #1640 from yabinmeng/main
Enhancement of JDBC adpater (to support pgvector recall testing)
This commit is contained in:
@@ -51,7 +51,7 @@
|
||||
<dependency>
|
||||
<groupId>org.postgresql</groupId>
|
||||
<artifactId>postgresql</artifactId>
|
||||
<version>42.5.2</version>
|
||||
<version>42.6.0</version>
|
||||
</dependency>
|
||||
|
||||
<!-- https://search.maven.org/artifact/com.zaxxer/HikariCP -->
|
||||
|
||||
@@ -52,15 +52,6 @@ public class JDBCOpMapper implements OpMapper<JDBCOp> {
|
||||
String spaceName = op.getStaticConfigOr("space", "default");
|
||||
JDBCSpace jdbcSpace = spaceCache.get(spaceName);
|
||||
|
||||
int nbThreadNum = NumberUtils.toInt(op.getStaticConfig("threads", String.class));
|
||||
int maxConnNum = jdbcSpace.getMaxNumConn();
|
||||
if (nbThreadNum > maxConnNum) {
|
||||
throw new JDBCAdapterInvalidParamException(
|
||||
"JDBC connection is NOT thread safe. The total NB thread number (" + nbThreadNum +
|
||||
") can NOT be greater than the maximum connection number 'num_conn' (" + maxConnNum + ")"
|
||||
);
|
||||
}
|
||||
|
||||
/*
|
||||
* If the user provides a body element, then they want to provide the JSON or
|
||||
* a data structure that can be converted into JSON, bypassing any further
|
||||
|
||||
@@ -20,8 +20,6 @@ import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
|
||||
import io.nosqlbench.adapter.jdbc.utils.JDBCAdapterUtil;
|
||||
import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector;
|
||||
import io.nosqlbench.api.config.standard.ConfigModel;
|
||||
import io.nosqlbench.api.config.standard.NBConfigModel;
|
||||
import io.nosqlbench.api.config.standard.NBConfiguration;
|
||||
@@ -33,12 +31,9 @@ import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class JDBCSpace implements AutoCloseable {
|
||||
private final static Logger logger = LogManager.getLogger(JDBCSpace.class);
|
||||
@@ -48,59 +43,38 @@ public class JDBCSpace implements AutoCloseable {
|
||||
// NOTE: Since JDBC connection is NOT thread-safe, the total NB threads MUST be less
|
||||
// than or equal to this number. This is to make sure one thread per connection.
|
||||
private final static int DEFAULT_CONN_NUM = 5;
|
||||
private final int maxNumConn;
|
||||
private int maxNumConn = DEFAULT_CONN_NUM;
|
||||
|
||||
// For DML statements, how many statements to put together in one batch
|
||||
// For DML write statements, how many statements to put together in one batch
|
||||
// - 1 : no batch (default)
|
||||
// - positive number: using batch
|
||||
private final static int DEFAULT_DML_BATCH_NUM = 1;
|
||||
private final int dmlBatchNum;
|
||||
private int dmlBatchNum = DEFAULT_DML_BATCH_NUM;
|
||||
|
||||
private final long totalCycleNum;
|
||||
private static boolean isShuttingDown = false;
|
||||
private long totalCycleNum;
|
||||
private int totalThreadNum;
|
||||
private boolean autoCommitCLI;
|
||||
|
||||
private HikariConfig hikariConfig;
|
||||
private boolean useHikariCP;
|
||||
private final HikariConfig connConfig = new HikariConfig();
|
||||
private HikariDataSource hikariDataSource;
|
||||
|
||||
ConcurrentHashMap<String, Connection> connections = new ConcurrentHashMap<>();
|
||||
// Maintain a client-side pooling just to make sure the allocated connections can
|
||||
// be reclaimed quickly, instead of waiting for Hikari pooling to reclaim it eventually
|
||||
public record ConnectionCacheKey(String connName) {
|
||||
}
|
||||
private final ConcurrentHashMap<ConnectionCacheKey, Connection> connections = new ConcurrentHashMap<>();
|
||||
|
||||
public JDBCSpace(String spaceName, NBConfiguration cfg) {
|
||||
this.spaceName = spaceName;
|
||||
this.totalCycleNum = NumberUtils.toLong(cfg.getOptional("cycles").orElse("1"));
|
||||
int totalThreads = NumberUtils.toInt(cfg.getOptional("threads").orElse("1"));
|
||||
int numConnInput = NumberUtils.toInt(cfg.getOptional("num_conn").orElse("10"));
|
||||
this.maxNumConn = Math.min(totalThreads, numConnInput);
|
||||
if (this.maxNumConn < 1) {
|
||||
throw new JDBCAdapterInvalidParamException(
|
||||
"'num_conn' NB CLI parameter must be a positive number!"
|
||||
);
|
||||
}
|
||||
|
||||
// Must be after the 'maxNumConn' statements and before the rest of the remaining statements!
|
||||
this.initializeSpace(cfg);
|
||||
|
||||
this.dmlBatchNum = NumberUtils.toInt(cfg.getOptional("dml_batch").orElse("1"));
|
||||
if (this.dmlBatchNum < 1) {
|
||||
throw new JDBCAdapterInvalidParamException(
|
||||
"'dml_batch' NB CLI parameter must be a positive number!"
|
||||
);
|
||||
}
|
||||
// According to JDBC spec,
|
||||
// - The commit behavior of executeBatch is always implementation-defined
|
||||
// when an error occurs and auto-commit is true.
|
||||
//
|
||||
// In this adapter, we treat it as an error if 'autoCommit' is ON and using batch at the same time.
|
||||
if ( (this.dmlBatchNum > 1) && (hikariConfig.isAutoCommit()) ) {
|
||||
if ( (this.dmlBatchNum > 1) && isAutoCommit() ) {
|
||||
throw new JDBCAdapterInvalidParamException(
|
||||
"Using batch, 'dml_batch'(" + this.dmlBatchNum + ") > 1, along with 'autoCommit' ON is not supported!"
|
||||
);
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("{} JDBC connections will be created [max(threads/{}, num_conn/{}]; " +
|
||||
"dml_batch: {}, autoCommit: {}",
|
||||
maxNumConn, totalThreads, numConnInput, dmlBatchNum, hikariConfig.isAutoCommit());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -108,62 +82,67 @@ public class JDBCSpace implements AutoCloseable {
|
||||
shutdownSpace();
|
||||
}
|
||||
|
||||
public int getMaxNumConn() { return this.maxNumConn; }
|
||||
public int getMaxNumConn() { return maxNumConn; }
|
||||
public void setMaxNumConn(int i) { maxNumConn = i; }
|
||||
|
||||
public int getDmlBatchNum() { return this.dmlBatchNum; }
|
||||
public int getDmlBatchNum() { return dmlBatchNum; }
|
||||
|
||||
public long getTotalCycleNum() { return this.totalCycleNum; }
|
||||
public long getTotalCycleNum() { return totalCycleNum; }
|
||||
public int getTotalThreadNum() { return totalThreadNum; }
|
||||
|
||||
public boolean isShuttingDown() { return isShuttingDown; }
|
||||
public void enterShutdownStage() { isShuttingDown = true; }
|
||||
|
||||
|
||||
public HikariDataSource getHikariDataSource() {
|
||||
return this.hikariDataSource;
|
||||
public boolean isAutoCommit() {
|
||||
if (useHikariCP)
|
||||
return connConfig.isAutoCommit();
|
||||
else
|
||||
return this.autoCommitCLI;
|
||||
}
|
||||
public boolean useHikariCP() { return useHikariCP; }
|
||||
public HikariConfig getConnConfig() { return connConfig; }
|
||||
|
||||
public Connection getConnection(String connectionName) {
|
||||
Connection connection = connections.get(connectionName);
|
||||
if (connection == null) {
|
||||
try {
|
||||
connection = hikariDataSource.getConnection();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JDBC connection ({}) is successfully created: {}",
|
||||
connectionName, connection);
|
||||
}
|
||||
|
||||
// Register 'vector' type
|
||||
JDBCPgVector.addVectorType(connection);
|
||||
public HikariDataSource getHikariDataSource() { return hikariDataSource; }
|
||||
|
||||
connections.put(connectionName, connection);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
String exp = "Exception occurred while attempting to create a connection using the HikariDataSource";
|
||||
logger.error(exp, ex);
|
||||
throw new JDBCAdapterUnexpectedException(exp);
|
||||
}
|
||||
}
|
||||
|
||||
return connection;
|
||||
public Connection getConnection(ConnectionCacheKey key, Supplier<Connection> connectionSupplier) {
|
||||
return connections.computeIfAbsent(key, __ -> connectionSupplier.get());
|
||||
}
|
||||
|
||||
private void initializeSpace(NBConfiguration cfg) {
|
||||
hikariConfig = new HikariConfig();
|
||||
//
|
||||
// NOTE: Although it looks like a good idea to use Hikari Connection Pooling
|
||||
// But in my testing, it shows some strange behaviors such as
|
||||
// 1) failed to allocate connection while the target server is completely working fine
|
||||
// e.g. it failed consistently on a m5d.4xlarge testing bed but not on my mac.
|
||||
// 2) doesn't really respect the 'max_connections' setting
|
||||
// 3) it also appears to me that Hikari connection is slow
|
||||
//
|
||||
// Therefore, use `use_hikaricp` option to control whether to use Hikari connection pooling. When
|
||||
// setting to 'false' (as default), it uses JDBC adapter's own (simple) connection management, with
|
||||
// JDBC driver's `DriverManager` to create connection directly.
|
||||
//
|
||||
this.useHikariCP = BooleanUtils.toBoolean(cfg.getOptional("use_hikaricp").orElse("true"));
|
||||
this.autoCommitCLI = BooleanUtils.toBoolean(cfg.getOptional("autoCommit").orElse("true"));
|
||||
this.dmlBatchNum = NumberUtils.toInt(cfg.getOptional("dml_batch").orElse("1"));
|
||||
if (this.dmlBatchNum < 0) dmlBatchNum = 1;
|
||||
logger.info("CLI input parameters -- useHikariCP:{}, autoCommitCLI:{}, dmlBatchNum:{}",
|
||||
useHikariCP, autoCommitCLI, dmlBatchNum);
|
||||
|
||||
hikariConfig.setJdbcUrl(cfg.get("url"));
|
||||
hikariConfig.addDataSourceProperty("serverName", cfg.get("serverName"));
|
||||
this.totalCycleNum = NumberUtils.toLong(cfg.getOptional("cycles").orElse("1"));
|
||||
this.totalThreadNum = NumberUtils.toInt(cfg.getOptional("threads").orElse("1"));
|
||||
|
||||
connConfig.setJdbcUrl(cfg.get("url"));
|
||||
connConfig.addDataSourceProperty("serverName", cfg.get("serverName"));
|
||||
|
||||
Optional<String> databaseName = cfg.getOptional("databaseName");
|
||||
if (databaseName.isPresent()) {
|
||||
hikariConfig.addDataSourceProperty("databaseName", databaseName.get());
|
||||
connConfig.addDataSourceProperty("databaseName", databaseName.get());
|
||||
}
|
||||
|
||||
int portNumber = Integer.parseInt(cfg.get("portNumber"));
|
||||
hikariConfig.addDataSourceProperty("portNumber", portNumber);
|
||||
connConfig.addDataSourceProperty("portNumber", portNumber);
|
||||
|
||||
Optional<String> user = cfg.getOptional("user");
|
||||
if (user.isPresent()) {
|
||||
hikariConfig.setUsername(user.get());
|
||||
connConfig.setUsername(user.get());
|
||||
}
|
||||
|
||||
Optional<String> password = cfg.getOptional("password");
|
||||
@@ -171,7 +150,7 @@ public class JDBCSpace implements AutoCloseable {
|
||||
if (user.isEmpty()) {
|
||||
throw new OpConfigError("Both user and password options are required. Only password is supplied in this case.");
|
||||
}
|
||||
hikariConfig.setPassword(password.get());
|
||||
connConfig.setPassword(password.get());
|
||||
} else {
|
||||
if (user.isPresent()) {
|
||||
throw new OpConfigError("Both user and password options are required. Only user is supplied in this case.");
|
||||
@@ -179,77 +158,56 @@ public class JDBCSpace implements AutoCloseable {
|
||||
}
|
||||
|
||||
Optional<Boolean> ssl = cfg.getOptional(Boolean.class, "ssl");
|
||||
hikariConfig.addDataSourceProperty("ssl", ssl.orElse(false));
|
||||
connConfig.addDataSourceProperty("ssl", ssl.orElse(false));
|
||||
|
||||
Optional<String> sslMode = cfg.getOptional("sslmode");
|
||||
if (sslMode.isPresent()) {
|
||||
hikariConfig.addDataSourceProperty("sslmode", sslMode.get());
|
||||
connConfig.addDataSourceProperty("sslmode", sslMode.get());
|
||||
} else {
|
||||
hikariConfig.addDataSourceProperty("sslmode", "prefer");
|
||||
connConfig.addDataSourceProperty("sslmode", "prefer");
|
||||
}
|
||||
|
||||
Optional<String> sslCert = cfg.getOptional("sslcert");
|
||||
if (sslCert.isPresent()) {
|
||||
hikariConfig.addDataSourceProperty("sslcert", sslCert.get());
|
||||
connConfig.addDataSourceProperty("sslcert", sslCert.get());
|
||||
} /*else if(sslMode.isPresent() && (!"disable".equalsIgnoreCase(sslMode.get()) || !"allow".equalsIgnoreCase(sslMode.get())) || !"prefer".equalsIgnoreCase(sslMode.get())) {
|
||||
throw new OpConfigError("When sslmode is true, sslcert should be provided.");
|
||||
}*/
|
||||
|
||||
Optional<String> sslRootCert = cfg.getOptional("sslrootcert");
|
||||
if (sslRootCert.isPresent()) {
|
||||
hikariConfig.addDataSourceProperty("sslrootcert", sslRootCert.get());
|
||||
connConfig.addDataSourceProperty("sslrootcert", sslRootCert.get());
|
||||
}
|
||||
|
||||
hikariConfig.addDataSourceProperty("applicationName", cfg.get("applicationName"));
|
||||
hikariConfig.addDataSourceProperty("rewriteBatchedInserts", cfg.getOrDefault("rewriteBatchedInserts", true));
|
||||
connConfig.addDataSourceProperty("applicationName", cfg.get("applicationName"));
|
||||
connConfig.addDataSourceProperty("rewriteBatchedInserts", cfg.getOrDefault("rewriteBatchedInserts", true));
|
||||
|
||||
// We're managing the auto-commit behavior of connections ourselves and hence disabling the auto-commit.
|
||||
Optional<String> autoCommitOpt = cfg.getOptional("autoCommit");
|
||||
boolean autoCommit = false;
|
||||
if (autoCommitOpt.isPresent()) autoCommit = BooleanUtils.toBoolean(autoCommitOpt.get());
|
||||
hikariConfig.setAutoCommit(autoCommit);
|
||||
|
||||
hikariConfig.setKeepaliveTime(Integer.parseInt(cfg.get("keepaliveTime")));
|
||||
|
||||
// HikariCP "maximumPoolSize" parameter is ignored.
|
||||
connConfig.setKeepaliveTime(Integer.parseInt(cfg.get("keepaliveTime")));
|
||||
// Use the NB "num_conn" parameter instead, wth 20% extra capacity
|
||||
hikariConfig.setMaximumPoolSize((int)Math.ceil(1.2*maxNumConn));
|
||||
connConfig.setMaximumPoolSize((int) Math.ceil(1.2 * maxNumConn));
|
||||
|
||||
this.hikariDataSource = new HikariDataSource(hikariConfig);
|
||||
if (useHikariCP) {
|
||||
this.hikariDataSource = new HikariDataSource(connConfig);
|
||||
logger.info("hikariDataSource is created : {}", hikariDataSource);
|
||||
}
|
||||
}
|
||||
|
||||
private void shutdownSpace() {
|
||||
isShuttingDown = true;
|
||||
|
||||
try {
|
||||
waitUntilAllOpFinished(System.currentTimeMillis());
|
||||
|
||||
logger.info("Shutting down JDBCSpace -- total {} of connections is being closed ...", connections.size());
|
||||
for (Connection connection : connections.values()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Close connection : {}", connection);
|
||||
}
|
||||
connection.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new JDBCAdapterUnexpectedException("Unexpected error when trying to close the JDBC connection!");
|
||||
}
|
||||
|
||||
hikariDataSource.close();
|
||||
}
|
||||
|
||||
private void waitUntilAllOpFinished(long shutdownStartTimeMills) {
|
||||
final int timeToWaitInSec = 5;
|
||||
long timeElapsedMills;
|
||||
boolean continueChk;
|
||||
|
||||
do {
|
||||
JDBCAdapterUtil.pauseCurThreadExec(1);
|
||||
|
||||
long curTimeMills = System.currentTimeMillis();
|
||||
timeElapsedMills = curTimeMills - shutdownStartTimeMills;
|
||||
continueChk = (timeElapsedMills <= (timeToWaitInSec*1000));
|
||||
} while (continueChk);
|
||||
|
||||
logger.info(
|
||||
"shutdownSpace::waitUntilAllOpFinished -- " +
|
||||
"shutdown time elapsed: " + timeElapsedMills + "ms.");
|
||||
if (hikariDataSource != null) {
|
||||
hikariDataSource.close();
|
||||
}
|
||||
}
|
||||
|
||||
public static NBConfigModel getConfigModel() {
|
||||
@@ -259,8 +217,8 @@ public class JDBCSpace implements AutoCloseable {
|
||||
.add(Param.defaultTo("dml_batch", DEFAULT_DML_BATCH_NUM)
|
||||
.setDescription("The number of DML write statements in a batch. Defaults to 1. Ignored by DML read statements!" +
|
||||
DEFAULT_DML_BATCH_NUM + "' (no batch)"))
|
||||
.add(Param.defaultTo("url", "jdbc:postgresql:/")
|
||||
.setDescription("The connection URL used to connect to the DBMS. Defaults to 'jdbc:postgresql:/'"))
|
||||
.add(Param.defaultTo("use_hikaricp", "true")
|
||||
.setDescription("Whether to use Hikari connection pooling (default: true)!"))
|
||||
.add(Param.defaultTo("url", "jdbc:postgresql:/")
|
||||
.setDescription("The connection URL used to connect to the DBMS. Defaults to 'jdbc:postgresql:/'"))
|
||||
.add(Param.defaultTo("serverName", "localhost")
|
||||
|
||||
@@ -17,10 +17,12 @@
|
||||
package io.nosqlbench.adapter.jdbc.opdispensers;
|
||||
|
||||
import io.nosqlbench.adapter.jdbc.JDBCSpace;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
|
||||
import io.nosqlbench.adapter.jdbc.optypes.JDBCOp;
|
||||
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
|
||||
import io.nosqlbench.adapters.api.templating.ParsedOp;
|
||||
import org.apache.commons.lang3.math.NumberUtils;
|
||||
|
||||
public abstract class JDBCBaseOpDispenser extends BaseOpDispenser<JDBCOp, JDBCSpace> {
|
||||
protected static final String ERROR_STATEMENT_CREATION =
|
||||
@@ -39,9 +41,4 @@ public abstract class JDBCBaseOpDispenser extends BaseOpDispenser<JDBCOp, JDBCSp
|
||||
this.isPreparedStatement = op.getStaticConfigOr("prepared", false);
|
||||
this.verifierKeyName = op.getStaticConfigOr("verifier-key", "");
|
||||
}
|
||||
public void checkShutdownEntry(long cycle) {
|
||||
if (cycle == (jdbcSpace.getTotalCycleNum()-1)) {
|
||||
jdbcSpace.enterShutdownStage();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,13 +41,14 @@ public class JDBCDDLOpDispenser extends JDBCBaseOpDispenser {
|
||||
this.isDdlStatement = true;
|
||||
this.ddlSqlStrFunc = sqlStrFunc;
|
||||
|
||||
// For DDL statements, must use autoCommit
|
||||
assert(jdbcSpace.isAutoCommit());
|
||||
if (isPreparedStatement) {
|
||||
throw new JDBCAdapterInvalidParamException("DDL statements can NOT be prepared!");
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public JDBCDDLOp apply(long cycle) {
|
||||
checkShutdownEntry(cycle);
|
||||
String ddlSqlStr = ddlSqlStrFunc.apply(cycle);
|
||||
return new JDBCDDLOp(jdbcSpace, ddlSqlStr);
|
||||
}
|
||||
|
||||
@@ -48,6 +48,39 @@ public class JDBCDMLOpDispenser extends JDBCBaseOpDispenser {
|
||||
this.isDdlStatement = false;
|
||||
this.isReadStatement = isReadStmt;
|
||||
|
||||
int numConnInput = Integer.parseInt(op.getStaticConfig("num_conn", String.class));
|
||||
|
||||
// Only apply 'one-thread-per-connection' limit to the WRITE workload
|
||||
// due to the fact that the PostgreSQL connection is not thread safe
|
||||
// For the READ workload, Do NOT apply this limitation.
|
||||
int threadNum = jdbcSpace.getTotalThreadNum();
|
||||
int maxNumConnFinal = numConnInput;
|
||||
|
||||
// For write workload, avoid thread-safety issue by using a constrained connection number
|
||||
// For read workload, it is ok to use more threads than available connections
|
||||
if (!isReadStmt) {
|
||||
if (threadNum > numConnInput) {
|
||||
throw new JDBCAdapterInvalidParamException(
|
||||
"JDBC connection is NOT thread safe. For write workload, the total NB thread number (" + threadNum +
|
||||
") can NOT be greater than the maximum connection number 'num_conn' (" + numConnInput + ")"
|
||||
);
|
||||
}
|
||||
}
|
||||
maxNumConnFinal = Math.min(threadNum, maxNumConnFinal);
|
||||
if (maxNumConnFinal < 1) {
|
||||
throw new JDBCAdapterInvalidParamException(
|
||||
"'num_conn' NB CLI parameter must be a positive number!"
|
||||
);
|
||||
}
|
||||
jdbcSpace.setMaxNumConn(maxNumConnFinal);
|
||||
|
||||
logger.info("Total {} JDBC connections will be created [isReadStmt:{}, threads/{}, num_conn/{}]; " +
|
||||
"dml_batch: {}, autoCommit: {}",
|
||||
maxNumConnFinal, isReadStmt, threadNum, numConnInput,
|
||||
jdbcSpace.getDmlBatchNum(), jdbcSpace.isAutoCommit());
|
||||
|
||||
// TODO: this is a current limitation applied by this adapter
|
||||
// improve this behavior by allowing the user to choose
|
||||
if (!isPreparedStatement && !isReadStatement) {
|
||||
throw new JDBCAdapterInvalidParamException("DML write statements MUST be prepared!");
|
||||
}
|
||||
@@ -69,8 +102,6 @@ public class JDBCDMLOpDispenser extends JDBCBaseOpDispenser {
|
||||
|
||||
@Override
|
||||
public JDBCDMLOp apply(long cycle) {
|
||||
checkShutdownEntry(cycle);
|
||||
|
||||
if (isReadStatement) {
|
||||
return new JDBCDMLReadOp(
|
||||
jdbcSpace,
|
||||
|
||||
@@ -33,20 +33,11 @@ public class JDBCDDLOp extends JDBCOp {
|
||||
this.ddlStmtStr = ddlStmtStr;
|
||||
}
|
||||
|
||||
private Statement createDDLStatement() {
|
||||
try {
|
||||
return jdbcConnection.createStatement();
|
||||
} catch (SQLException e) {
|
||||
throw new JDBCAdapterUnexpectedException(
|
||||
"Unable to create a regular (non-prepared) JDBC statement");
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public Object apply(long value) {
|
||||
try {
|
||||
Statement stmt = createDDLStatement();
|
||||
Statement stmt = jdbcConnection.createStatement();
|
||||
stmt.execute(ddlStmtStr);
|
||||
closeStatement(stmt);
|
||||
return true;
|
||||
} catch (SQLException sqlException) {
|
||||
throw new JDBCAdapterUnexpectedException(
|
||||
|
||||
@@ -18,7 +18,6 @@ package io.nosqlbench.adapter.jdbc.optypes;
|
||||
import io.nosqlbench.adapter.jdbc.JDBCSpace;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCPgVectorException;
|
||||
import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
@@ -73,12 +72,12 @@ public abstract class JDBCDMLOp extends JDBCOp {
|
||||
}
|
||||
|
||||
// Only applicable to a prepared statement
|
||||
protected PreparedStatement setPrepStmtValues(PreparedStatement stmt, List<Object> valList) {
|
||||
protected PreparedStatement setPrepStmtValues(PreparedStatement stmt) throws SQLException {
|
||||
assert (stmt != null);
|
||||
|
||||
for (int i=0; i<valList.size(); i++) {
|
||||
for (int i=0; i<pStmtValList.size(); i++) {
|
||||
int fieldIdx = i + 1;
|
||||
Object fieldValObj = valList.get(i);
|
||||
Object fieldValObj = pStmtValList.get(i);
|
||||
assert (fieldValObj != null);
|
||||
|
||||
try {
|
||||
@@ -98,8 +97,8 @@ public abstract class JDBCDMLOp extends JDBCOp {
|
||||
}
|
||||
stmt.setObject(fieldIdx, fieldValObj);
|
||||
}
|
||||
catch (JDBCPgVectorException | SQLException e) {
|
||||
throw new RuntimeException(
|
||||
catch ( SQLException e) {
|
||||
throw new SQLException(
|
||||
"Failed to parse the prepared statement value for field[" + fieldIdx + "] " + fieldValObj);
|
||||
}
|
||||
}
|
||||
@@ -107,41 +106,31 @@ public abstract class JDBCDMLOp extends JDBCOp {
|
||||
return stmt;
|
||||
}
|
||||
|
||||
protected void processCommit() {
|
||||
try {
|
||||
if (!jdbcConnection.getAutoCommit()) {
|
||||
jdbcConnection.commit();
|
||||
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
throw new JDBCAdapterUnexpectedException("Failed to process JDBC statement commit!");
|
||||
|
||||
protected void processCommit() throws SQLException {
|
||||
if (!jdbcConnection.getAutoCommit()) {
|
||||
jdbcConnection.commit();
|
||||
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
protected Statement createDMLStatement() {
|
||||
protected Statement createDMLStatement() throws SQLException {
|
||||
Statement stmt = jdbcStmtTL.get();
|
||||
if (stmt == null) {
|
||||
if (isPreparedStmt)
|
||||
stmt = jdbcConnection.prepareStatement(pStmtSqlStr);
|
||||
else
|
||||
stmt = jdbcConnection.createStatement();
|
||||
|
||||
try {
|
||||
if (stmt == null) {
|
||||
if (isPreparedStmt)
|
||||
stmt = jdbcConnection.prepareStatement(pStmtSqlStr);
|
||||
else
|
||||
stmt = jdbcConnection.createStatement();
|
||||
jdbcStmtTL.set(stmt);
|
||||
|
||||
jdbcStmtTL.set(stmt);
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("A statement is created -- prepared: {}, read/write: {}, stmt: {}",
|
||||
isPreparedStmt,
|
||||
isReadStmt ? "read" : "write",
|
||||
stmt);
|
||||
}
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("A statement is created -- prepared: {}, read/write: {}, stmt: {}",
|
||||
isPreparedStmt,
|
||||
isReadStmt ? "read" : "write",
|
||||
stmt);
|
||||
}
|
||||
|
||||
return stmt;
|
||||
} catch (SQLException e) {
|
||||
throw new JDBCAdapterUnexpectedException(
|
||||
"Unable to create a prepared JDBC statement");
|
||||
}
|
||||
return stmt;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,12 +41,12 @@ public class JDBCDMLReadOp extends JDBCDMLOp {
|
||||
|
||||
@Override
|
||||
public Object apply(long value) {
|
||||
Statement stmt = super.createDMLStatement();
|
||||
if (isPreparedStmt) {
|
||||
stmt = super.setPrepStmtValues((PreparedStatement) stmt, this.pStmtValList);
|
||||
}
|
||||
try {
|
||||
Statement stmt = super.createDMLStatement();
|
||||
if (isPreparedStmt) {
|
||||
stmt = setPrepStmtValues((PreparedStatement) stmt);
|
||||
}
|
||||
|
||||
try {
|
||||
// key string list to be used in the "Vector" relevancy score verification
|
||||
List<String> verifierValueList = new ArrayList<>();
|
||||
|
||||
@@ -59,7 +59,6 @@ public class JDBCDMLReadOp extends JDBCDMLOp {
|
||||
verifierValueList.add(keyVal);
|
||||
}
|
||||
} while (rs.next());
|
||||
closeStatement(stmt);
|
||||
}
|
||||
else {
|
||||
boolean isResultSet = ((PreparedStatement)stmt).execute();
|
||||
@@ -82,8 +81,6 @@ public class JDBCDMLReadOp extends JDBCDMLOp {
|
||||
}
|
||||
isResultSet = stmt.getMoreResults();
|
||||
}
|
||||
|
||||
closeStatement(stmt);
|
||||
}
|
||||
|
||||
return verifierValueList;
|
||||
|
||||
@@ -20,6 +20,7 @@ import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
@@ -44,24 +45,22 @@ public class JDBCDMLWriteOp extends JDBCDMLOp {
|
||||
@Override
|
||||
public Object apply(long value) {
|
||||
int trackingCnt = threadBatchTrackingCntTL.get();
|
||||
trackingCnt = trackingCnt + 1;
|
||||
trackingCnt++;
|
||||
threadBatchTrackingCntTL.set(trackingCnt);
|
||||
|
||||
PreparedStatement stmt = (PreparedStatement) super.createDMLStatement();
|
||||
stmt = super.setPrepStmtValues(stmt, this.pStmtValList);
|
||||
|
||||
try {
|
||||
assert (isPreparedStmt);
|
||||
PreparedStatement stmt = (PreparedStatement) super.createDMLStatement();
|
||||
stmt = super.setPrepStmtValues(stmt);
|
||||
|
||||
// No batch
|
||||
if (ddlStmtBatchNum == 1) {
|
||||
int result_cnt = stmt.executeUpdate();
|
||||
super.processCommit();
|
||||
closeStatement(stmt);
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("[single ddl - execution] cycle:{}, result_cnt: {}, stmt: {}",
|
||||
value, result_cnt, stmt);
|
||||
}
|
||||
|
||||
return result_cnt;
|
||||
}
|
||||
// Use batch
|
||||
@@ -71,12 +70,12 @@ public class JDBCDMLWriteOp extends JDBCDMLOp {
|
||||
LOGGER.debug("[batch ddl - adding to batch] cycle:{}, stmt: {}",
|
||||
value, stmt);
|
||||
}
|
||||
|
||||
if ( (trackingCnt % ddlStmtBatchNum == 0) || jdbcSpace.isShuttingDown() ) {
|
||||
// NOTE: if the total number of cycles is not the multiple of the batch number,
|
||||
// some records in a batch may not be written to the database
|
||||
// To avoid this, make sure the total cycle number is the multiple of the batch number
|
||||
if (trackingCnt % ddlStmtBatchNum == 0) {
|
||||
int[] counts = stmt.executeBatch();
|
||||
processCommit();
|
||||
closeStatement(stmt);
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("[batch ddl - execution] cycle:{}, total_batch_res_cnt:{}, stmt: {}",
|
||||
value, counts, stmt);
|
||||
|
||||
@@ -18,14 +18,14 @@ package io.nosqlbench.adapter.jdbc.optypes;
|
||||
|
||||
import io.nosqlbench.adapter.jdbc.JDBCSpace;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
|
||||
import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.sql.*;
|
||||
import java.util.Properties;
|
||||
import java.util.Random;
|
||||
|
||||
public abstract class JDBCOp implements CycleOp {
|
||||
private static final Logger LOGGER = LogManager.getLogger(JDBCOp.class);
|
||||
@@ -34,18 +34,50 @@ public abstract class JDBCOp implements CycleOp {
|
||||
|
||||
protected final JDBCSpace jdbcSpace;
|
||||
protected final Connection jdbcConnection;
|
||||
private final Random random = new Random();
|
||||
|
||||
public JDBCOp(JDBCSpace jdbcSpace) {
|
||||
this.jdbcSpace = jdbcSpace;
|
||||
String curThreadName = Thread.currentThread().getName();
|
||||
this.jdbcConnection = this.jdbcSpace.getConnection(curThreadName);
|
||||
this.jdbcConnection = getConnection();
|
||||
}
|
||||
|
||||
protected void closeStatement(Statement stmt) throws SQLException {
|
||||
if (! (stmt instanceof PreparedStatement)) {
|
||||
stmt.close();
|
||||
} else if (jdbcSpace.isShuttingDown()) {
|
||||
stmt.close();
|
||||
}
|
||||
private Connection getConnection() {
|
||||
int rnd = random.nextInt(0, jdbcSpace.getMaxNumConn());
|
||||
final String connectionName = "jdbc-conn-" + rnd;
|
||||
|
||||
return jdbcSpace.getConnection(
|
||||
new JDBCSpace.ConnectionCacheKey(connectionName), () -> {
|
||||
try {
|
||||
Connection connection;
|
||||
|
||||
if (jdbcSpace.useHikariCP()) {
|
||||
connection = jdbcSpace.getHikariDataSource().getConnection();
|
||||
}
|
||||
// Use DriverManager directly
|
||||
else {
|
||||
String url = jdbcSpace.getConnConfig().getJdbcUrl();
|
||||
Properties props = jdbcSpace.getConnConfig().getDataSourceProperties();
|
||||
props.put("user", jdbcSpace.getConnConfig().getUsername());
|
||||
props.put("password", jdbcSpace.getConnConfig().getPassword());
|
||||
connection = DriverManager.getConnection(url, props);
|
||||
}
|
||||
|
||||
// Register 'vector' type
|
||||
JDBCPgVector.addVectorType(connection);
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("A new JDBC connection ({}) is successfully created: {}",
|
||||
connectionName, connection);
|
||||
}
|
||||
|
||||
return connection;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
String exp = "Exception occurred while attempting to create a connection (useHikariCP=" +
|
||||
jdbcSpace.useHikariCP() + ")";
|
||||
LOGGER.error(exp, ex);
|
||||
throw new JDBCAdapterUnexpectedException(exp);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,62 +0,0 @@
|
||||
# run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags="block:schema" threads=AUTO cycles=4 url="jdbc:postgresql://host:port/database" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName=insectdb sslrootcert="/path/to/postgresql_certs/root.crt" -vv --show-stacktraces
|
||||
min_version: "5.17.2"
|
||||
|
||||
description: |
|
||||
A workload with only text keys and text values. This is based on the CQL keyvalue workloads as found
|
||||
in cql-keyvalue2.yaml.
|
||||
|
||||
scenarios:
|
||||
default:
|
||||
schema: run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags==block:schema threads=1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
rampup: run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags==block:rampup threads=AUTO cycles===TEMPLATE(rampup-cycles,100) url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
main: run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags==block:'main.*' threads=AUTO cycles===TEMPLATE(main-cycles,100) url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
|
||||
params:
|
||||
instrument: TEMPLATE(instrument,false)
|
||||
|
||||
bindings:
|
||||
seq_key: Mod(TEMPLATE(keycount,1000000000)); ToString() -> String
|
||||
seq_value: Hash(); Mod(TEMPLATE(valuecount,1000000000)); ToString() -> String
|
||||
rw_key: <<keydist:Uniform(0,1000000000)->int>>; ToString() -> String
|
||||
rw_value: Hash(); <<valdist:Uniform(0,1000000000)->int>>; ToString() -> String
|
||||
|
||||
blocks:
|
||||
schema:
|
||||
ops:
|
||||
drop-database:
|
||||
execute: |
|
||||
DROP DATABASE IF EXISTS TEMPLATE(database,baselines);
|
||||
create-database:
|
||||
execute: |
|
||||
CREATE DATABASE IF NOT EXISTS TEMPLATE(database,baselines);
|
||||
drop-table:
|
||||
execute: |
|
||||
DROP TABLE IF EXISTS TEMPLATE(database,baselines).TEMPLATE(table,keyvalue);
|
||||
create-table:
|
||||
execute: |
|
||||
CREATE TABLE IF NOT EXISTS TEMPLATE(database,baselines).TEMPLATE(table,keyvalue)
|
||||
(key STRING PRIMARY KEY, value STRING);
|
||||
|
||||
rampup:
|
||||
params:
|
||||
ops:
|
||||
rampup-insert:
|
||||
update: |
|
||||
INSERT INTO TEMPLATE(database,baselines).TEMPLATE(table,keyvalue)
|
||||
(key, value) VALUES ({seq_key},{seq_value});
|
||||
|
||||
main-read:
|
||||
params:
|
||||
ratio: TEMPLATE(read_ratio,5)
|
||||
ops:
|
||||
main-select:
|
||||
query: |
|
||||
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue) WHERE key='{rw_key}';
|
||||
main-write:
|
||||
params:
|
||||
ratio: TEMPLATE(write_ratio,5)
|
||||
ops:
|
||||
main-insert:
|
||||
update: |
|
||||
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,keyvalue)
|
||||
(key, value) VALUES ('{rw_key}', '{rw_value}');
|
||||
@@ -1,157 +0,0 @@
|
||||
min_version: "5.17.2"
|
||||
|
||||
description: |
|
||||
A tabular workload with partitions, clusters, and data fields
|
||||
This workload contains partitioning and cluster along with a set
|
||||
of 8 fields of varying length. The field values vary in size according
|
||||
to the fibonacci sequence times a base size factor of 10, with
|
||||
an additional 10% variance for each field.
|
||||
The read patterns have a variety of field subsets specified.
|
||||
|
||||
During rampup, all rows will be written partition by partition,
|
||||
filling in all rows of that partition before moving on to the next.
|
||||
Example: With a partition size of 1000 and 1B rows, there will be
|
||||
1000000 partitions.
|
||||
|
||||
During main phase, the read patterns are varied with different
|
||||
field sets. As well, the number of rows which will be returned
|
||||
is varied between 1 and 10.
|
||||
|
||||
By default, reads occur at the same ratio as writes, with main
|
||||
phase writes writing full rows.
|
||||
|
||||
You can bulk up the size of the payloads by 10x with addzeroes='0',
|
||||
by 100x with addzeroes='00', and so on, but if you want to go higher
|
||||
than 100x, you'll need to modify the workload with a larger reference
|
||||
file in the HashedFileExtractToString(...) bindings.
|
||||
|
||||
scenarios:
|
||||
default:
|
||||
schema: run driver=jdbc tags==block:schema cycles==UNDEF threads==1
|
||||
rampup: run driver=jdbc tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto
|
||||
main: run driver=jdbc tags==block:'main.*' cycles===TEMPLATE(main-cycles,100) threads=auto
|
||||
|
||||
params:
|
||||
instrument: TEMPLATE(instrument,false)
|
||||
|
||||
bindings:
|
||||
# for ramp-up and verify phases
|
||||
#
|
||||
part_layout: Div(<<partsize:1000>>); ToString() -> String
|
||||
clust_layout: Mod(<<partsize:1000>>); ToString() -> String
|
||||
# todo: update these definitions to use the simpler 10,0.1, 20, 0.2, ...
|
||||
data0: Add(10); HashedFileExtractToString('data/lorem_ipsum_full.txt',9TEMPLATE(addzeroes,),11TEMPLATE(addzeroes,))
|
||||
data1: Add(20); HashedFileExtractToString('data/lorem_ipsum_full.txt',18TEMPLATE(addzeroes,),22TEMPLATE(addzeroes,))
|
||||
data2: Add(30); HashedFileExtractToString('data/lorem_ipsum_full.txt',27TEMPLATE(addzeroes,),33TEMPLATE(addzeroes,))
|
||||
data3: Add(40); HashedFileExtractToString('data/lorem_ipsum_full.txt',45TEMPLATE(addzeroes,),55TEMPLATE(addzeroes,))
|
||||
data4: Add(50); HashedFileExtractToString('data/lorem_ipsum_full.txt',72TEMPLATE(addzeroes,),88TEMPLATE(addzeroes,))
|
||||
data5: Add(60); HashedFileExtractToString('data/lorem_ipsum_full.txt',107TEMPLATE(addzeroes,),143TEMPLATE(addzeroes,))
|
||||
data6: Add(70); HashedFileExtractToString('data/lorem_ipsum_full.txt',189TEMPLATE(addzeroes,),231TEMPLATE(addzeroes,))
|
||||
data7: Add(80); HashedFileExtractToString('data/lorem_ipsum_full.txt',306TEMPLATE(addzeroes,),374TEMPLATE(addzeroes,))
|
||||
|
||||
# for main phase
|
||||
# for write
|
||||
part_write: Hash(); Uniform(0,TEMPLATE(partcount,100))->int; ToString() -> String
|
||||
clust_write: Hash(); Add(1); Uniform(0,TEMPLATE(partsize,1000000))->int; ToString() -> String
|
||||
data_write: Hash(); HashedFileExtractToString('data/lorem_ipsum_full.txt',50,150) -> String
|
||||
|
||||
# for read
|
||||
limit: Uniform(1,10) -> int
|
||||
part_read: Uniform(0,TEMPLATE(partcount,100))->int; ToString() -> String
|
||||
clust_read: Add(1); Uniform(0,TEMPLATE(partsize,1000000))->int; ToString() -> String
|
||||
|
||||
blocks:
|
||||
schema:
|
||||
params:
|
||||
prepared: false
|
||||
ops:
|
||||
#drop-database:
|
||||
# execute: |
|
||||
# DROP DATABASE IF EXISTS TEMPLATE(database,baselines);
|
||||
create-database:
|
||||
execute: |
|
||||
CREATE DATABASE IF NOT EXISTS TEMPLATE(database,baselines);
|
||||
drop-table:
|
||||
execute: |
|
||||
DROP TABLE IF EXISTS TEMPLATE(database,baselines).TEMPLATE(table,tabular);
|
||||
create-table:
|
||||
execute: |
|
||||
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular) (
|
||||
part STRING,
|
||||
clust STRING,
|
||||
data0 STRING, data1 STRING, data2 STRING, data3 STRING,
|
||||
data4 STRING, data5 STRING, data6 STRING, data7 STRING,
|
||||
PRIMARY KEY (part,clust)
|
||||
);
|
||||
|
||||
rampup:
|
||||
params:
|
||||
ops:
|
||||
rampup-insert:
|
||||
update: |
|
||||
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
(part,clust,data0,data1,data2,data3,data4,data5,data6,data7)
|
||||
VALUES (
|
||||
'{part_layout}','{clust_layout}','{data0}','{data1}','{data2}',
|
||||
'{data3}','{data4}','{data5}','{data6}','{data7}'
|
||||
);
|
||||
|
||||
verify:
|
||||
params:
|
||||
cl: TEMPLATE(read_cl,LOCAL_QUORUM)
|
||||
ops:
|
||||
verify-select:
|
||||
query: |
|
||||
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
WHERE part='{part_layout}'
|
||||
AND clust='{clust_layout}'
|
||||
|
||||
main-read:
|
||||
params:
|
||||
ratio: TEMPLATE(read_ratio,1)
|
||||
ops:
|
||||
main-select-all:
|
||||
query: |
|
||||
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
WHERE part='{part_read}' LIMIT {limit};
|
||||
main-select-01:
|
||||
query: |
|
||||
SELECT data0,data1 from TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
WHERE part='{part_read}' LIMIT {limit};
|
||||
main-select-0246:
|
||||
query: |
|
||||
SELECT data0,data2,data4,data6 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
WHERE part='{part_read}' LIMIT {limit};
|
||||
main-select-1357:
|
||||
query: |
|
||||
SELECT data1,data3,data5,data7 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
WHERE part='{part_read}' LIMIT {limit};
|
||||
main-select-0123:
|
||||
query: |
|
||||
SELECT data0,data1,data2,data3 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
WHERE part='{part_read}' LIMIT {limit};
|
||||
main-select-4567:
|
||||
query: |
|
||||
SELECT data4,data5,data6,data7 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
WHERE part='{part_read}' LIMIT {limit};
|
||||
main-select-67:
|
||||
query: |
|
||||
SELECT data6,data7 FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
WHERE part='{part_read}' LIMIT {limit};
|
||||
main-select:
|
||||
query: |
|
||||
SELECT data0,data1,data2,data3,data4,data5,data6,data7
|
||||
FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
WHERE part='{part_read}' LIMIT {limit};
|
||||
main-write:
|
||||
params:
|
||||
ratio: TEMPLATE(write_ratio,8)
|
||||
ops:
|
||||
main-write:
|
||||
update: |
|
||||
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,tabular)
|
||||
(part, clust, data0,data1,data2,data3,data4,data5,data6,data7)
|
||||
VALUES (
|
||||
'{part_write}','{clust_write}','{data0}','{data1}','{data2}',
|
||||
'{data3}','{data4}','{data5}','{data6}','{data7}'
|
||||
);
|
||||
@@ -1,85 +0,0 @@
|
||||
# jara -jar nb5.jar cockroachdb-timeseries default -vv --show-stacktraces
|
||||
min_version: "5.17.2"
|
||||
|
||||
description: |
|
||||
This workload emulates a time-series data model and access patterns.
|
||||
|
||||
scenarios:
|
||||
default:
|
||||
schema: run driver=jdbc tags==block:schema cycles==UNDEF threads==1 url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
rampup: run driver=jdbc tags==block:rampup cycles===TEMPLATE(rampup-cycles,100) threads=auto url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
main: run driver=jdbc tags==block:'main.*' cycles===TEMPLATE(main-cycles,100) threads=auto url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
|
||||
params:
|
||||
instrument: TEMPLATE(instrument,false)
|
||||
|
||||
bindings:
|
||||
machine_id: Mod(TEMPLATE(sources,10000)); ToHashedUUID() -> java.util.UUID
|
||||
sensor_name: HashedLineToString('data/variable_words.txt')
|
||||
time: Mul(TEMPLATE(timespeed,100)L); Div(TEMPLATE(sources,10000)L); ToJavaInstant()
|
||||
cell_timestamp: Mul(TEMPLATE(timespeed,100)L); Div(TEMPLATE(sources,10000)L); Mul(1000L)
|
||||
sensor_value: Normal(0.0,5.0); Add(100.0) -> double
|
||||
station_id: Div(TEMPLATE(sources,10000));Mod(TEMPLATE(stations,100)); ToHashedUUID() -> java.util.UUID
|
||||
data: HashedFileExtractToString('data/lorem_ipsum_full.txt',800TEMPLATE(addzeroes,),1200TEMPLATE(addzeroes,))
|
||||
|
||||
blocks:
|
||||
schema:
|
||||
params:
|
||||
ops:
|
||||
drop-database:
|
||||
#execute: |
|
||||
# DROP DATABASE IF EXISTS TEMPLATE(database,baselines);
|
||||
create-database:
|
||||
execute: |
|
||||
CREATE DATABASE IF NOT EXISTS TEMPLATE(database,baselines);
|
||||
drop-table:
|
||||
execute: |
|
||||
DROP TABLE IF EXISTS TEMPLATE(database,baselines).TEMPLATE(table,iot);
|
||||
create-table:
|
||||
execute: |
|
||||
CREATE TABLE IF NOT EXISTS TEMPLATE(keyspace,baselines).TEMPLATE(table,iot) (
|
||||
machine_id UUID,
|
||||
sensor_name STRING,
|
||||
time TIMESTAMP,
|
||||
sensor_value FLOAT,
|
||||
station_id UUID,
|
||||
data STRING,
|
||||
PRIMARY KEY (machine_id ASC, sensor_name ASC),
|
||||
INDEX TEMPLATE(table,iot)_time_desc_idx (time DESC)
|
||||
);
|
||||
|
||||
rampup:
|
||||
params:
|
||||
ops:
|
||||
insert-rampup:
|
||||
update: |
|
||||
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,iot)
|
||||
(machine_id, sensor_name, time, sensor_value, station_id, data)
|
||||
VALUES (
|
||||
'{machine_id}', '{sensor_name}', '{time}', {sensor_value}, '{station_id}', '{data}'
|
||||
);
|
||||
|
||||
#using timestamp {cell_timestamp}
|
||||
|
||||
main-read:
|
||||
params:
|
||||
ratio: TEMPLATE(read_ratio,1)
|
||||
ops:
|
||||
select-read:
|
||||
query: |
|
||||
SELECT * FROM TEMPLATE(keyspace,baselines).TEMPLATE(table,iot)
|
||||
WHERE machine_id='{machine_id}' and sensor_name='{sensor_name}'
|
||||
LIMIT TEMPLATE(limit,10);
|
||||
main-write:
|
||||
params:
|
||||
ratio: TEMPLATE(write_ratio,9)
|
||||
ops:
|
||||
insert-main:
|
||||
update: |
|
||||
INSERT INTO TEMPLATE(keyspace,baselines).TEMPLATE(table,iot)
|
||||
(machine_id, sensor_name, time, sensor_value, station_id, data)
|
||||
VALUES (
|
||||
'{machine_id}', '{sensor_name}', '{time}', {sensor_value}, '{station_id}', '{data}'
|
||||
);
|
||||
|
||||
#using timestamp {cell_timestamp}
|
||||
@@ -0,0 +1,100 @@
|
||||
# run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags="block:schema" threads=AUTO cycles=4 url="jdbc:postgresql://host:port/database" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName=insectdb sslrootcert="/path/to/postgresql_certs/root.crt" -vv --show-stacktraces
|
||||
min_version: "5.17.2"
|
||||
|
||||
description: |
|
||||
A workload which reads ann-benchmarks vector data from HDF5 file format for PostgreSql with baselinetor.
|
||||
|
||||
scenarios:
|
||||
default:
|
||||
# supabase environment
|
||||
drop: run driver=jdbc tags==block:drop threads===1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
schema: run driver=jdbc tags==block:schema threads===1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
testann: run driver=jdbc tags==block:testann threads=AUTO cycles===TEMPLATE(main-cycles,1000) url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
train: run driver=jdbc tags==block:train threads=AUTO cycles===TEMPLATE(trainsize) dml_batch=120 autoCommit=false url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
|
||||
bindings:
|
||||
rw_key: ToString();
|
||||
train_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datasetfile).hdf5", "/train"); ToCqlVector();
|
||||
test_floatlist: HdfFileToFloatList("testdata/TEMPLATE(datasetfile).hdf5", "/test"); ToCqlVector();
|
||||
relevant_indices: HdfFileToIntArray("testdata/TEMPLATE(datasetfile).hdf5", "/neighbors")
|
||||
|
||||
blocks:
|
||||
drop:
|
||||
ops:
|
||||
drop_vector_index:
|
||||
ddl: |
|
||||
DROP INDEX IF EXISTS idx_TEMPLATE(tablename,baseline)_TEMPLATE(indextype)_TEMPLATE(similarity_function);
|
||||
drop_table:
|
||||
ddl: |
|
||||
DROP TABLE IF EXISTS TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline);
|
||||
##
|
||||
# NOTE: Do NOT enable this block for 'runall.sh' script
|
||||
# --------------------------------------------------
|
||||
# drop_schema:
|
||||
# ddl: |
|
||||
# DROP SCHEMA IF EXISTS TEMPLATE(schemaname,public);
|
||||
|
||||
schema:
|
||||
ops:
|
||||
create_schema:
|
||||
ddl: |
|
||||
CREATE SCHEMA IF NOT EXISTS TEMPLATE(schemaname,public);
|
||||
create_table:
|
||||
ddl: |
|
||||
CREATE TABLE IF NOT EXISTS TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline)
|
||||
(key TEXT PRIMARY KEY, value vector(TEMPLATE(dimensions,5)));
|
||||
create_vector_index:
|
||||
ddl: |
|
||||
CREATE INDEX IF NOT EXISTS idx_TEMPLATE(tablename,baseline)_TEMPLATE(indextype)_TEMPLATE(similarity_function)
|
||||
ON TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline)
|
||||
USING TEMPLATE(indextype) (value vector_TEMPLATE(similarity_function)_ops)
|
||||
WITH (TEMPLATE(indexopt));
|
||||
|
||||
train:
|
||||
params:
|
||||
prepared: true
|
||||
ops:
|
||||
main_insert:
|
||||
dmlwrite: |
|
||||
INSERT INTO TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline) VALUES (?,?)
|
||||
ON CONFLICT DO NOTHING;
|
||||
prep_stmt_val_arr: |
|
||||
{rw_key},{train_floatlist}
|
||||
|
||||
testann:
|
||||
params:
|
||||
prepared: true
|
||||
ops:
|
||||
# NOTE: right now this is only for cosine similarity.
|
||||
# in baselinetor, '<=>' is for cosine similarity
|
||||
# '<->' is for euclidean distance
|
||||
# '<#>' is for inner product
|
||||
main_select:
|
||||
dmlread: |
|
||||
SELECT *
|
||||
FROM TEMPLATE(schemaname,public).TEMPLATE(tablename,baseline)
|
||||
ORDER BY value <=> ?
|
||||
LIMIT TEMPLATE(top_k,100);
|
||||
prep_stmt_val_arr: |
|
||||
{test_floatlist}
|
||||
#################################
|
||||
## NOTE:
|
||||
# 1). The script blocks below are ONLY relevant with Vector relevancy score verification
|
||||
# 2). The "verifier-key" must match the Vector data identifier column name (e.g. primary key name)
|
||||
# right now the identifier must be a type that can be converted to int.
|
||||
verifier-key: "key"
|
||||
verifier-init: |
|
||||
relevancy=scriptingmetrics.newRelevancyMeasures(_parsed_op);
|
||||
k=TEMPLATE(top_k,100)
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k));
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k));
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k));
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k));
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k));
|
||||
verifier: |
|
||||
// driver-specific function
|
||||
actual_indices=pgvec_utils.getValueListForVerifierKey(result);
|
||||
// driver-agnostic function
|
||||
relevancy.accept({relevant_indices},actual_indices);
|
||||
// because we are "verifying" although this needs to be reorganized
|
||||
return true;
|
||||
@@ -1,152 +0,0 @@
|
||||
# run driver=jdbc workload="/path/to/postgresql-keyvalue.yaml" tags="block:schema" threads=AUTO cycles=4 url="jdbc:postgresql://host:port/database" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName=insectdb sslrootcert="/path/to/postgresql_certs/root.crt" -vv --show-stacktraces
|
||||
min_version: "5.17.2"
|
||||
|
||||
description: |
|
||||
A workload which reads ann-benchmarks vector data from HDF5 file format for PostgreSql with pgvector.
|
||||
|
||||
scenarios:
|
||||
default:
|
||||
####
|
||||
# The following CLI parameters are required for all named scenarios:
|
||||
# - schema: schema name
|
||||
# - table: table name
|
||||
####
|
||||
|
||||
###
|
||||
## For DDL workload, turn on 'AutoCommit'. Turning it off will cause errors.
|
||||
###
|
||||
drop-tbl: run driver=jdbc tags==block:drop-tbl threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
|
||||
# The following CLI parameters is needed for 'create-tbl' named scenario:
|
||||
# - dimensions: vector dimension size (MUST match the actual ANN benchmark data)
|
||||
create-tbl: run driver=jdbc tags==block:create-tbl threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
|
||||
#
|
||||
# Vectors with up to 2,000 dimensions can be indexed.
|
||||
#
|
||||
# The following extra CLI parameter is needed for both 'create-vec-idx' and 'drop-vec-idx' named scenarios:
|
||||
# - indexName: index name
|
||||
drop-vec-idx: run driver=jdbc tags==block:drop-vec-idx threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
|
||||
# The following extra CLI parameters are needed for 'create-vec-idx' named scenario:
|
||||
# - indexType: index type; valid values: 'ivfflat' or 'hnsw' (see: https://github.com/pgvector/pgvector#indexing)
|
||||
# - indexOpt: index options
|
||||
# * for 'ivfflat' index type, the option is like: "lists=<number>"
|
||||
# * for 'hnsw' index type, the option is like: "m=<number>,ef_construction =<number>"
|
||||
# - relFunc: relevancy function; valid values: 'l2' (L2 distance), 'ip' (Inner product), or 'cosine' (Cosine distance)
|
||||
create-vec-idx: run driver=jdbc tags==block:create-vec-idx threads==1 cycles==UNDEF url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
|
||||
|
||||
###
|
||||
## For DML workload, 'AutoCommit' can be off or on
|
||||
## - MUST be off when batching is used !!
|
||||
###
|
||||
# The following extra CLI parameters is needed for both 'vec-read' and 'vec-write' named scenarios:
|
||||
# - dataset: ANN benchmark testing dataset name
|
||||
#
|
||||
# The following extra CLI parameters is needed for 'vec-read' named scenario:
|
||||
# - queryLimit: the number of the records to be returned as in 'LIMIT <number>' clause
|
||||
vec-read: run driver=jdbc tags==block:vec-read cycles===TEMPLATE(main-cycles,100) threads=AUTO url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt" autoCommit="true"
|
||||
# The following extra CLI parameters is needed for 'vec-write' named scenario:
|
||||
# - trainsize: the number of records to load from the dataset and insert into the table
|
||||
vec-write: run driver=jdbc tags==block:vec-write cycles===TEMPLATE(trainsize) threads=AUTO url="jdbc:postgresql://host:port/" databaseName="defaultdb" portNumber=5432 user="newuser" password="CHANGE_ME" sslmode="prefer" serverName="pgsql" sslrootcert="/path/to/postgresql_certs/root.crt"
|
||||
|
||||
bindings:
|
||||
rw_key: ToString()
|
||||
train_vector: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/train");
|
||||
test_vector: HdfFileToFloatList("testdata/TEMPLATE(dataset).hdf5", "/test");
|
||||
validation_set: HdfFileToIntArray("testdata/TEMPLATE(dataset).hdf5", "/neighbors");
|
||||
|
||||
blocks:
|
||||
drop-tbl:
|
||||
params:
|
||||
# DDL statement must NOT be prepared
|
||||
prepared: false
|
||||
ops:
|
||||
drop-table:
|
||||
ddl: |
|
||||
DROP TABLE IF EXISTS TEMPLATE(schema,public).TEMPLATE(table,pgvec);
|
||||
drop-schema:
|
||||
ddl: |
|
||||
DROP SCHEMA IF EXISTS TEMPLATE(schema,public);
|
||||
|
||||
create-tbl:
|
||||
params:
|
||||
# DDL statement must NOT be prepared
|
||||
prepared: false
|
||||
ops:
|
||||
create-schema:
|
||||
ddl: |
|
||||
CREATE SCHEMA IF NOT EXISTS TEMPLATE(schema,public);
|
||||
create-table:
|
||||
ddl: |
|
||||
CREATE TABLE IF NOT EXISTS TEMPLATE(schema,public).TEMPLATE(table,pgvec)
|
||||
(key TEXT PRIMARY KEY, value vector(<<dimensions:5>>));
|
||||
|
||||
drop-vec-idx:
|
||||
params:
|
||||
# DDL statement must NOT be prepared
|
||||
prepared: false
|
||||
ops:
|
||||
drop-vector-index:
|
||||
ddl: |
|
||||
DROP INDEX IF EXISTS TEMPLATE(schema,public).TEMPLATE(indexName,TEMPLATE(dataset)-idx);
|
||||
|
||||
create-vec-idx:
|
||||
params:
|
||||
# DDL statement must NOT be prepared
|
||||
prepared: false
|
||||
ops:
|
||||
create-vector-index:
|
||||
ddl: |
|
||||
CREATE INDEX IF NOT EXISTS TEMPLATE(indexName,TEMPLATE(dataset)-idx-TEMPLATE(indexType))
|
||||
ON TEMPLATE(schema,public).TEMPLATE(table,pgvec)
|
||||
USING TEMPLATE(indexType) (value vector_TEMPLATE(relFunc)_ops)
|
||||
WITH (TEMPLATE(indexOpt));
|
||||
|
||||
# Using PostgreSQl upsert (INSERT ON CONFLICT statement)
|
||||
vec-write:
|
||||
params:
|
||||
# DML write statement MUST be prepared
|
||||
prepared: true
|
||||
ops:
|
||||
main-insert:
|
||||
dmlwrite: |
|
||||
INSERT INTO TEMPLATE(schema,public).TEMPLATE(table,pgvec) VALUES (?,?)
|
||||
ON CONFLICT DO NOTHING;
|
||||
prep_stmt_val_arr: |
|
||||
{rw_key},{test_vector}
|
||||
|
||||
vec-read:
|
||||
ops:
|
||||
params:
|
||||
# DML READ statement can be prepared or not
|
||||
prepared: true
|
||||
main-select:
|
||||
dmlread: |
|
||||
SELECT key, (value <-> ?) as relevancy, value
|
||||
FROM TEMPLATE(schema,public).TEMPLATE(table,pgvec)
|
||||
ORDER BY value <-> ?
|
||||
LIMIT TEMPLATE(queryLimit,100);
|
||||
prep_stmt_val_arr: |
|
||||
{test_vector},{test_vector}
|
||||
#################################
|
||||
## NOTE:
|
||||
# 1). The script blocks below are ONLY relevant with Vector relevancy score verification
|
||||
# 2). The "verifier-key" must match the Vector data identifier column name (e.g. primary key name)
|
||||
# right now the identifier must be a type that can be converted to int.
|
||||
verifier-key: "key"
|
||||
verifier-imports:
|
||||
- io.nosqlbench.adapter.mongodb.MongoDbUtils
|
||||
verifier-init: |
|
||||
relevancy=scriptingmetrics.newRelevancyMeasures(_parsed_op);
|
||||
for (int k in List.of(100)) {
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.recall("recall",k));
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.precision("precision",k));
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.F1("F1",k));
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.reciprocal_rank("RR",k));
|
||||
relevancy.addFunction(io.nosqlbench.engine.extensions.computefunctions.RelevancyFunctions.average_precision("AP",k));
|
||||
}
|
||||
verifier: |
|
||||
// driver-specific function
|
||||
actual_indices=pgvec_utils.getValueListForVerifierKey(result);
|
||||
// driver-agnostic function
|
||||
relevancy.accept({validation_set},actual_indices);
|
||||
// because we are "verifying" although this needs to be reorganized
|
||||
return true;
|
||||
Reference in New Issue
Block a user