From ce60423ea649c63c3b3786002195848d3ebe77b2 Mon Sep 17 00:00:00 2001 From: yabinmeng Date: Thu, 19 Oct 2023 21:40:45 -0500 Subject: [PATCH] JDBC code temp change - 20131019 09:40pm --- .../nosqlbench/adapter/jdbc/JDBCOpMapper.java | 9 -- .../io/nosqlbench/adapter/jdbc/JDBCSpace.java | 88 ++++++------------- .../opdispensers/JDBCBaseOpDispenser.java | 7 +- .../jdbc/opdispensers/JDBCDDLOpDispenser.java | 1 - .../jdbc/opdispensers/JDBCDMLOpDispenser.java | 35 +++++++- .../adapter/jdbc/optypes/JDBCDDLOp.java | 16 ++-- .../adapter/jdbc/optypes/JDBCDMLOp.java | 34 ++++--- .../adapter/jdbc/optypes/JDBCDMLReadOp.java | 16 ++-- .../adapter/jdbc/optypes/JDBCDMLWriteOp.java | 30 ++++--- .../adapter/jdbc/optypes/JDBCOp.java | 11 --- 10 files changed, 107 insertions(+), 140 deletions(-) diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCOpMapper.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCOpMapper.java index d4d793f21..e307c36f7 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCOpMapper.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCOpMapper.java @@ -52,15 +52,6 @@ public class JDBCOpMapper implements OpMapper { String spaceName = op.getStaticConfigOr("space", "default"); JDBCSpace jdbcSpace = spaceCache.get(spaceName); - int nbThreadNum = NumberUtils.toInt(op.getStaticConfig("threads", String.class)); - int maxConnNum = jdbcSpace.getMaxNumConn(); - if (nbThreadNum > maxConnNum) { - throw new JDBCAdapterInvalidParamException( - "JDBC connection is NOT thread safe. The total NB thread number (" + nbThreadNum + - ") can NOT be greater than the maximum connection number 'num_conn' (" + maxConnNum + ")" - ); - } - /* * If the user provides a body element, then they want to provide the JSON or * a data structure that can be converted into JSON, bypassing any further diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCSpace.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCSpace.java index 058bea758..c927e7588 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCSpace.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/JDBCSpace.java @@ -28,15 +28,14 @@ import io.nosqlbench.api.config.standard.NBConfiguration; import io.nosqlbench.api.config.standard.Param; import io.nosqlbench.api.errors.OpConfigError; import org.apache.commons.lang3.BooleanUtils; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.math.NumberUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.sql.Connection; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -48,33 +47,27 @@ public class JDBCSpace implements AutoCloseable { // NOTE: Since JDBC connection is NOT thread-safe, the total NB threads MUST be less // than or equal to this number. This is to make sure one thread per connection. private final static int DEFAULT_CONN_NUM = 5; - private final int maxNumConn; + private int maxNumConn = DEFAULT_CONN_NUM; - // For DML statements, how many statements to put together in one batch + // For DML write statements, how many statements to put together in one batch // - 1 : no batch (default) // - positive number: using batch private final static int DEFAULT_DML_BATCH_NUM = 1; - private final int dmlBatchNum; + private int dmlBatchNum = DEFAULT_DML_BATCH_NUM; private final long totalCycleNum; - private static boolean isShuttingDown = false; - + private final int totalThreadNum; private HikariConfig hikariConfig; private HikariDataSource hikariDataSource; + private final Random random; ConcurrentHashMap connections = new ConcurrentHashMap<>(); public JDBCSpace(String spaceName, NBConfiguration cfg) { this.spaceName = spaceName; this.totalCycleNum = NumberUtils.toLong(cfg.getOptional("cycles").orElse("1")); - int totalThreads = NumberUtils.toInt(cfg.getOptional("threads").orElse("1")); - int numConnInput = NumberUtils.toInt(cfg.getOptional("num_conn").orElse("10")); - this.maxNumConn = Math.min(totalThreads, numConnInput); - if (this.maxNumConn < 1) { - throw new JDBCAdapterInvalidParamException( - "'num_conn' NB CLI parameter must be a positive number!" - ); - } + this.totalThreadNum = NumberUtils.toInt(cfg.getOptional("threads").orElse("1")); + this.random = new Random(); // Must be after the 'maxNumConn' statements and before the rest of the remaining statements! this.initializeSpace(cfg); @@ -95,12 +88,6 @@ public class JDBCSpace implements AutoCloseable { "Using batch, 'dml_batch'(" + this.dmlBatchNum + ") > 1, along with 'autoCommit' ON is not supported!" ); } - - if (logger.isDebugEnabled()) { - logger.debug("{} JDBC connections will be created [max(threads/{}, num_conn/{}]; " + - "dml_batch: {}, autoCommit: {}", - maxNumConn, totalThreads, numConnInput, dmlBatchNum, hikariConfig.isAutoCommit()); - } } @Override @@ -109,42 +96,39 @@ public class JDBCSpace implements AutoCloseable { } public int getMaxNumConn() { return this.maxNumConn; } + public void setMaxNumConn(int i) { this.maxNumConn = i; } public int getDmlBatchNum() { return this.dmlBatchNum; } public long getTotalCycleNum() { return this.totalCycleNum; } - - public boolean isShuttingDown() { return isShuttingDown; } - public void enterShutdownStage() { isShuttingDown = true; } - + public int getTotalThreadNum() { return this.totalThreadNum; } public HikariDataSource getHikariDataSource() { return this.hikariDataSource; } - public Connection getConnection(String connectionName) { - Connection connection = connections.get(connectionName); - if (connection == null) { + public Connection getConnection() { + int rnd = random.nextInt(0, getMaxNumConn()); + final String connectionName = "jdbc-conn-" + rnd; + + return connections.computeIfAbsent(connectionName, key -> { try { - connection = hikariDataSource.getConnection(); + Connection connection = hikariDataSource.getConnection(); if (logger.isDebugEnabled()) { logger.debug("JDBC connection ({}) is successfully created: {}", connectionName, connection); } - // Register 'vector' type JDBCPgVector.addVectorType(connection); - connections.put(connectionName, connection); + return connection; } catch (Exception ex) { String exp = "Exception occurred while attempting to create a connection using the HikariDataSource"; logger.error(exp, ex); throw new JDBCAdapterUnexpectedException(exp); } - } - - return connection; + }); } private void initializeSpace(NBConfiguration cfg) { @@ -210,22 +194,22 @@ public class JDBCSpace implements AutoCloseable { hikariConfig.setAutoCommit(autoCommit); hikariConfig.setKeepaliveTime(Integer.parseInt(cfg.get("keepaliveTime"))); - - // HikariCP "maximumPoolSize" parameter is ignored. // Use the NB "num_conn" parameter instead, wth 20% extra capacity hikariConfig.setMaximumPoolSize((int)Math.ceil(1.2*maxNumConn)); this.hikariDataSource = new HikariDataSource(hikariConfig); + + logger.info("hikariDataSource is created : {}", hikariDataSource); } private void shutdownSpace() { - isShuttingDown = true; - try { - waitUntilAllOpFinished(System.currentTimeMillis()); - + logger.info("Total {} of connections is being closed ...", connections.size()); for (Connection connection : connections.values()) { - connection.close(); + if (logger.isDebugEnabled()) { + logger.debug("Close connection : {}", connection); + connection.close(); + } } } catch (Exception e) { throw new JDBCAdapterUnexpectedException("Unexpected error when trying to close the JDBC connection!"); @@ -234,24 +218,6 @@ public class JDBCSpace implements AutoCloseable { hikariDataSource.close(); } - private void waitUntilAllOpFinished(long shutdownStartTimeMills) { - final int timeToWaitInSec = 5; - long timeElapsedMills; - boolean continueChk; - - do { - JDBCAdapterUtil.pauseCurThreadExec(1); - - long curTimeMills = System.currentTimeMillis(); - timeElapsedMills = curTimeMills - shutdownStartTimeMills; - continueChk = (timeElapsedMills <= (timeToWaitInSec*1000)); - } while (continueChk); - - logger.info( - "shutdownSpace::waitUntilAllOpFinished -- " + - "shutdown time elapsed: " + timeElapsedMills + "ms."); - } - public static NBConfigModel getConfigModel() { return ConfigModel.of(JDBCSpace.class) .add(Param.defaultTo("num_conn", DEFAULT_CONN_NUM) diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/opdispensers/JDBCBaseOpDispenser.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/opdispensers/JDBCBaseOpDispenser.java index e5c520bbc..9abfe45bc 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/opdispensers/JDBCBaseOpDispenser.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/opdispensers/JDBCBaseOpDispenser.java @@ -17,10 +17,12 @@ package io.nosqlbench.adapter.jdbc.opdispensers; import io.nosqlbench.adapter.jdbc.JDBCSpace; +import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException; import io.nosqlbench.adapter.jdbc.optypes.JDBCOp; import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser; import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter; import io.nosqlbench.adapters.api.templating.ParsedOp; +import org.apache.commons.lang3.math.NumberUtils; public abstract class JDBCBaseOpDispenser extends BaseOpDispenser { protected static final String ERROR_STATEMENT_CREATION = @@ -39,9 +41,4 @@ public abstract class JDBCBaseOpDispenser extends BaseOpDispenser numConnInput) { + throw new JDBCAdapterInvalidParamException( + "JDBC connection is NOT thread safe. For write workload, the total NB thread number (" + threadNum + + ") can NOT be greater than the maximum connection number 'num_conn' (" + numConnInput + ")" + ); + } + } + maxNumConnFinal = Math.min(threadNum, maxNumConnFinal); + if (maxNumConnFinal < 1) { + throw new JDBCAdapterInvalidParamException( + "'num_conn' NB CLI parameter must be a positive number!" + ); + } + jdbcSpace.setMaxNumConn(maxNumConnFinal); + + logger.info("Total {} JDBC connections will be created [isReadStmt:{}, threads/{}, num_conn/{}]; " + + "dml_batch: {}, autoCommit: {}", + maxNumConnFinal, isReadStmt, threadNum, numConnInput, + jdbcSpace.getDmlBatchNum(), jdbcSpace.getHikariDataSource().isAutoCommit()); + + // TODO: this is a current limitation applied by this adapter + // improve this behavior by allowing the user to choose if (!isPreparedStatement && !isReadStatement) { throw new JDBCAdapterInvalidParamException("DML write statements MUST be prepared!"); } @@ -69,8 +102,6 @@ public class JDBCDMLOpDispenser extends JDBCBaseOpDispenser { @Override public JDBCDMLOp apply(long cycle) { - checkShutdownEntry(cycle); - if (isReadStatement) { return new JDBCDMLReadOp( jdbcSpace, diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDDLOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDDLOp.java index 2577ad91c..42f19d445 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDDLOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDDLOp.java @@ -33,20 +33,16 @@ public class JDBCDDLOp extends JDBCOp { this.ddlStmtStr = ddlStmtStr; } - private Statement createDDLStatement() { - try { - return jdbcConnection.createStatement(); - } catch (SQLException e) { - throw new JDBCAdapterUnexpectedException( - "Unable to create a regular (non-prepared) JDBC statement"); - } + private Statement createDDLStatement(Connection connection) throws SQLException { + return connection.createStatement(); } @Override public Object apply(long value) { try { - Statement stmt = createDDLStatement(); - stmt.execute(ddlStmtStr); - closeStatement(stmt); + Connection connection = jdbcSpace.getConnection(); + try (Statement stmt = createDDLStatement(connection)) { + stmt.execute(ddlStmtStr); + } return true; } catch (SQLException sqlException) { throw new JDBCAdapterUnexpectedException( diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java index a44e0d1c7..bf3f8bd8a 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLOp.java @@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; @@ -73,12 +74,12 @@ public abstract class JDBCDMLOp extends JDBCOp { } // Only applicable to a prepared statement - protected PreparedStatement setPrepStmtValues(PreparedStatement stmt, List valList) { + protected PreparedStatement setPrepStmtValues(PreparedStatement stmt) { assert (stmt != null); - for (int i=0; i LOG_COMMIT_SUCCESS); } - } catch (SQLException e) { - throw new JDBCAdapterUnexpectedException("Failed to process JDBC statement commit!"); } } - protected Statement createDMLStatement() { + protected Statement createDMLStatement(Connection connection) throws SQLException { Statement stmt = jdbcStmtTL.get(); - try { - if (stmt == null) { + if (stmt == null) { + if ( (connection!=null) && !(connection.isClosed()) ) { if (isPreparedStmt) - stmt = jdbcConnection.prepareStatement(pStmtSqlStr); + stmt = connection.prepareStatement(pStmtSqlStr); else - stmt = jdbcConnection.createStatement(); + stmt = connection.createStatement(); jdbcStmtTL.set(stmt); @@ -137,11 +136,8 @@ public abstract class JDBCDMLOp extends JDBCOp { stmt); } } - - return stmt; - } catch (SQLException e) { - throw new JDBCAdapterUnexpectedException( - "Unable to create a prepared JDBC statement"); } + + return stmt; } } diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLReadOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLReadOp.java index e1d303f79..f9466706c 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLReadOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLReadOp.java @@ -41,12 +41,13 @@ public class JDBCDMLReadOp extends JDBCDMLOp { @Override public Object apply(long value) { - Statement stmt = super.createDMLStatement(); - if (isPreparedStmt) { - stmt = super.setPrepStmtValues((PreparedStatement) stmt, this.pStmtValList); - } + try { + Connection connection = super.jdbcSpace.getConnection(); + Statement stmt = super.createDMLStatement(connection); + if (isPreparedStmt) { + stmt = setPrepStmtValues((PreparedStatement) stmt); + } - try { // key string list to be used in the "Vector" relevancy score verification List verifierValueList = new ArrayList<>(); @@ -59,11 +60,10 @@ public class JDBCDMLReadOp extends JDBCDMLOp { verifierValueList.add(keyVal); } } while (rs.next()); - closeStatement(stmt); } else { boolean isResultSet = ((PreparedStatement)stmt).execute(); - super.processCommit(); + super.processCommit(connection); while(true) { if(isResultSet) { @@ -82,8 +82,6 @@ public class JDBCDMLReadOp extends JDBCDMLOp { } isResultSet = stmt.getMoreResults(); } - - closeStatement(stmt); } return verifierValueList; diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLWriteOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLWriteOp.java index adfa7862d..455002a97 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLWriteOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCDMLWriteOp.java @@ -20,6 +20,7 @@ import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.List; @@ -44,24 +45,23 @@ public class JDBCDMLWriteOp extends JDBCDMLOp { @Override public Object apply(long value) { int trackingCnt = threadBatchTrackingCntTL.get(); - trackingCnt = trackingCnt + 1; + trackingCnt++; threadBatchTrackingCntTL.set(trackingCnt); - PreparedStatement stmt = (PreparedStatement) super.createDMLStatement(); - stmt = super.setPrepStmtValues(stmt, this.pStmtValList); - try { + assert (isPreparedStmt); + Connection connection = super.jdbcSpace.getConnection(); + PreparedStatement stmt = (PreparedStatement) super.createDMLStatement(connection); + stmt = super.setPrepStmtValues(stmt); + // No batch if (ddlStmtBatchNum == 1) { int result_cnt = stmt.executeUpdate(); - super.processCommit(); - closeStatement(stmt); - + super.processCommit(connection); if (LOGGER.isDebugEnabled()) { LOGGER.debug("[single ddl - execution] cycle:{}, result_cnt: {}, stmt: {}", value, result_cnt, stmt); } - return result_cnt; } // Use batch @@ -71,12 +71,12 @@ public class JDBCDMLWriteOp extends JDBCDMLOp { LOGGER.debug("[batch ddl - adding to batch] cycle:{}, stmt: {}", value, stmt); } - - if ( (trackingCnt % ddlStmtBatchNum == 0) || jdbcSpace.isShuttingDown() ) { + // NOTE: if the total number of cycles is not the multiple of the batch number, + // some records in a batch may not be written to the database + // To avoid this, make sure the total cycle number is the multiple of the batch number + if (trackingCnt % ddlStmtBatchNum == 0) { int[] counts = stmt.executeBatch(); - processCommit(); - closeStatement(stmt); - + processCommit(connection); if (LOGGER.isDebugEnabled()) { LOGGER.debug("[batch ddl - execution] cycle:{}, total_batch_res_cnt:{}, stmt: {}", value, counts, stmt); @@ -88,6 +88,10 @@ public class JDBCDMLWriteOp extends JDBCDMLOp { } } catch (SQLException sqlException) { + LOGGER.info("pStmtSqlStr={}", pStmtSqlStr); + LOGGER.info("pStmtValList={}", pStmtValList); + LOGGER.info("value:{},trackingCnt:{}",value,trackingCnt); + throw new JDBCAdapterUnexpectedException( "Failed to execute the prepared DDL statement: \"" + pStmtSqlStr + "\", " + "with values: \"" + pStmtValList + "\""); diff --git a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java index 38e1189af..18eafea13 100644 --- a/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java +++ b/adapter-jdbc/src/main/java/io/nosqlbench/adapter/jdbc/optypes/JDBCOp.java @@ -33,19 +33,8 @@ public abstract class JDBCOp implements CycleOp { "Executed the JDBC statement & committed the connection successfully"; protected final JDBCSpace jdbcSpace; - protected final Connection jdbcConnection; public JDBCOp(JDBCSpace jdbcSpace) { this.jdbcSpace = jdbcSpace; - String curThreadName = Thread.currentThread().getName(); - this.jdbcConnection = this.jdbcSpace.getConnection(curThreadName); - } - - protected void closeStatement(Statement stmt) throws SQLException { - if (! (stmt instanceof PreparedStatement)) { - stmt.close(); - } else if (jdbcSpace.isShuttingDown()) { - stmt.close(); - } } }