JDBC code temp change - 20131019 09:40pm

This commit is contained in:
yabinmeng 2023-10-19 21:40:45 -05:00
parent 45b5e52fd9
commit ce60423ea6
10 changed files with 107 additions and 140 deletions

View File

@ -52,15 +52,6 @@ public class JDBCOpMapper implements OpMapper<JDBCOp> {
String spaceName = op.getStaticConfigOr("space", "default");
JDBCSpace jdbcSpace = spaceCache.get(spaceName);
int nbThreadNum = NumberUtils.toInt(op.getStaticConfig("threads", String.class));
int maxConnNum = jdbcSpace.getMaxNumConn();
if (nbThreadNum > maxConnNum) {
throw new JDBCAdapterInvalidParamException(
"JDBC connection is NOT thread safe. The total NB thread number (" + nbThreadNum +
") can NOT be greater than the maximum connection number 'num_conn' (" + maxConnNum + ")"
);
}
/*
* If the user provides a body element, then they want to provide the JSON or
* a data structure that can be converted into JSON, bypassing any further

View File

@ -28,15 +28,14 @@ import io.nosqlbench.api.config.standard.NBConfiguration;
import io.nosqlbench.api.config.standard.Param;
import io.nosqlbench.api.errors.OpConfigError;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -48,33 +47,27 @@ public class JDBCSpace implements AutoCloseable {
// NOTE: Since JDBC connection is NOT thread-safe, the total NB threads MUST be less
// than or equal to this number. This is to make sure one thread per connection.
private final static int DEFAULT_CONN_NUM = 5;
private final int maxNumConn;
private int maxNumConn = DEFAULT_CONN_NUM;
// For DML statements, how many statements to put together in one batch
// For DML write statements, how many statements to put together in one batch
// - 1 : no batch (default)
// - positive number: using batch
private final static int DEFAULT_DML_BATCH_NUM = 1;
private final int dmlBatchNum;
private int dmlBatchNum = DEFAULT_DML_BATCH_NUM;
private final long totalCycleNum;
private static boolean isShuttingDown = false;
private final int totalThreadNum;
private HikariConfig hikariConfig;
private HikariDataSource hikariDataSource;
private final Random random;
ConcurrentHashMap<String, Connection> connections = new ConcurrentHashMap<>();
public JDBCSpace(String spaceName, NBConfiguration cfg) {
this.spaceName = spaceName;
this.totalCycleNum = NumberUtils.toLong(cfg.getOptional("cycles").orElse("1"));
int totalThreads = NumberUtils.toInt(cfg.getOptional("threads").orElse("1"));
int numConnInput = NumberUtils.toInt(cfg.getOptional("num_conn").orElse("10"));
this.maxNumConn = Math.min(totalThreads, numConnInput);
if (this.maxNumConn < 1) {
throw new JDBCAdapterInvalidParamException(
"'num_conn' NB CLI parameter must be a positive number!"
);
}
this.totalThreadNum = NumberUtils.toInt(cfg.getOptional("threads").orElse("1"));
this.random = new Random();
// Must be after the 'maxNumConn' statements and before the rest of the remaining statements!
this.initializeSpace(cfg);
@ -95,12 +88,6 @@ public class JDBCSpace implements AutoCloseable {
"Using batch, 'dml_batch'(" + this.dmlBatchNum + ") > 1, along with 'autoCommit' ON is not supported!"
);
}
if (logger.isDebugEnabled()) {
logger.debug("{} JDBC connections will be created [max(threads/{}, num_conn/{}]; " +
"dml_batch: {}, autoCommit: {}",
maxNumConn, totalThreads, numConnInput, dmlBatchNum, hikariConfig.isAutoCommit());
}
}
@Override
@ -109,42 +96,39 @@ public class JDBCSpace implements AutoCloseable {
}
public int getMaxNumConn() { return this.maxNumConn; }
public void setMaxNumConn(int i) { this.maxNumConn = i; }
public int getDmlBatchNum() { return this.dmlBatchNum; }
public long getTotalCycleNum() { return this.totalCycleNum; }
public boolean isShuttingDown() { return isShuttingDown; }
public void enterShutdownStage() { isShuttingDown = true; }
public int getTotalThreadNum() { return this.totalThreadNum; }
public HikariDataSource getHikariDataSource() {
return this.hikariDataSource;
}
public Connection getConnection(String connectionName) {
Connection connection = connections.get(connectionName);
if (connection == null) {
public Connection getConnection() {
int rnd = random.nextInt(0, getMaxNumConn());
final String connectionName = "jdbc-conn-" + rnd;
return connections.computeIfAbsent(connectionName, key -> {
try {
connection = hikariDataSource.getConnection();
Connection connection = hikariDataSource.getConnection();
if (logger.isDebugEnabled()) {
logger.debug("JDBC connection ({}) is successfully created: {}",
connectionName, connection);
}
// Register 'vector' type
JDBCPgVector.addVectorType(connection);
connections.put(connectionName, connection);
return connection;
}
catch (Exception ex) {
String exp = "Exception occurred while attempting to create a connection using the HikariDataSource";
logger.error(exp, ex);
throw new JDBCAdapterUnexpectedException(exp);
}
}
return connection;
});
}
private void initializeSpace(NBConfiguration cfg) {
@ -210,22 +194,22 @@ public class JDBCSpace implements AutoCloseable {
hikariConfig.setAutoCommit(autoCommit);
hikariConfig.setKeepaliveTime(Integer.parseInt(cfg.get("keepaliveTime")));
// HikariCP "maximumPoolSize" parameter is ignored.
// Use the NB "num_conn" parameter instead, wth 20% extra capacity
hikariConfig.setMaximumPoolSize((int)Math.ceil(1.2*maxNumConn));
this.hikariDataSource = new HikariDataSource(hikariConfig);
logger.info("hikariDataSource is created : {}", hikariDataSource);
}
private void shutdownSpace() {
isShuttingDown = true;
try {
waitUntilAllOpFinished(System.currentTimeMillis());
logger.info("Total {} of connections is being closed ...", connections.size());
for (Connection connection : connections.values()) {
connection.close();
if (logger.isDebugEnabled()) {
logger.debug("Close connection : {}", connection);
connection.close();
}
}
} catch (Exception e) {
throw new JDBCAdapterUnexpectedException("Unexpected error when trying to close the JDBC connection!");
@ -234,24 +218,6 @@ public class JDBCSpace implements AutoCloseable {
hikariDataSource.close();
}
private void waitUntilAllOpFinished(long shutdownStartTimeMills) {
final int timeToWaitInSec = 5;
long timeElapsedMills;
boolean continueChk;
do {
JDBCAdapterUtil.pauseCurThreadExec(1);
long curTimeMills = System.currentTimeMillis();
timeElapsedMills = curTimeMills - shutdownStartTimeMills;
continueChk = (timeElapsedMills <= (timeToWaitInSec*1000));
} while (continueChk);
logger.info(
"shutdownSpace::waitUntilAllOpFinished -- " +
"shutdown time elapsed: " + timeElapsedMills + "ms.");
}
public static NBConfigModel getConfigModel() {
return ConfigModel.of(JDBCSpace.class)
.add(Param.defaultTo("num_conn", DEFAULT_CONN_NUM)

View File

@ -17,10 +17,12 @@
package io.nosqlbench.adapter.jdbc.opdispensers;
import io.nosqlbench.adapter.jdbc.JDBCSpace;
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
import io.nosqlbench.adapter.jdbc.optypes.JDBCOp;
import io.nosqlbench.adapters.api.activityimpl.BaseOpDispenser;
import io.nosqlbench.adapters.api.activityimpl.uniform.DriverAdapter;
import io.nosqlbench.adapters.api.templating.ParsedOp;
import org.apache.commons.lang3.math.NumberUtils;
public abstract class JDBCBaseOpDispenser extends BaseOpDispenser<JDBCOp, JDBCSpace> {
protected static final String ERROR_STATEMENT_CREATION =
@ -39,9 +41,4 @@ public abstract class JDBCBaseOpDispenser extends BaseOpDispenser<JDBCOp, JDBCSp
this.isPreparedStatement = op.getStaticConfigOr("prepared", false);
this.verifierKeyName = op.getStaticConfigOr("verifier-key", "");
}
public void checkShutdownEntry(long cycle) {
if (cycle == (jdbcSpace.getTotalCycleNum()-1)) {
jdbcSpace.enterShutdownStage();
}
}
}

View File

@ -47,7 +47,6 @@ public class JDBCDDLOpDispenser extends JDBCBaseOpDispenser {
}
@Override
public JDBCDDLOp apply(long cycle) {
checkShutdownEntry(cycle);
String ddlSqlStr = ddlSqlStrFunc.apply(cycle);
return new JDBCDDLOp(jdbcSpace, ddlSqlStr);
}

View File

@ -48,6 +48,39 @@ public class JDBCDMLOpDispenser extends JDBCBaseOpDispenser {
this.isDdlStatement = false;
this.isReadStatement = isReadStmt;
int numConnInput = Integer.parseInt(op.getStaticConfig("num_conn", String.class));
// Only apply 'one-thread-per-connection' limit to the WRITE workload
// due to the fact that the PostgreSQL connection is not thread safe
// For the READ workload, Do NOT apply this limitation.
int threadNum = jdbcSpace.getTotalThreadNum();
int maxNumConnFinal = numConnInput;
// For write workload, avoid thread-safety issue by using a constrained connection number
// For read workload, it is ok to use more threads than available connections
if (!isReadStmt) {
if (threadNum > numConnInput) {
throw new JDBCAdapterInvalidParamException(
"JDBC connection is NOT thread safe. For write workload, the total NB thread number (" + threadNum +
") can NOT be greater than the maximum connection number 'num_conn' (" + numConnInput + ")"
);
}
}
maxNumConnFinal = Math.min(threadNum, maxNumConnFinal);
if (maxNumConnFinal < 1) {
throw new JDBCAdapterInvalidParamException(
"'num_conn' NB CLI parameter must be a positive number!"
);
}
jdbcSpace.setMaxNumConn(maxNumConnFinal);
logger.info("Total {} JDBC connections will be created [isReadStmt:{}, threads/{}, num_conn/{}]; " +
"dml_batch: {}, autoCommit: {}",
maxNumConnFinal, isReadStmt, threadNum, numConnInput,
jdbcSpace.getDmlBatchNum(), jdbcSpace.getHikariDataSource().isAutoCommit());
// TODO: this is a current limitation applied by this adapter
// improve this behavior by allowing the user to choose
if (!isPreparedStatement && !isReadStatement) {
throw new JDBCAdapterInvalidParamException("DML write statements MUST be prepared!");
}
@ -69,8 +102,6 @@ public class JDBCDMLOpDispenser extends JDBCBaseOpDispenser {
@Override
public JDBCDMLOp apply(long cycle) {
checkShutdownEntry(cycle);
if (isReadStatement) {
return new JDBCDMLReadOp(
jdbcSpace,

View File

@ -33,20 +33,16 @@ public class JDBCDDLOp extends JDBCOp {
this.ddlStmtStr = ddlStmtStr;
}
private Statement createDDLStatement() {
try {
return jdbcConnection.createStatement();
} catch (SQLException e) {
throw new JDBCAdapterUnexpectedException(
"Unable to create a regular (non-prepared) JDBC statement");
}
private Statement createDDLStatement(Connection connection) throws SQLException {
return connection.createStatement();
}
@Override
public Object apply(long value) {
try {
Statement stmt = createDDLStatement();
stmt.execute(ddlStmtStr);
closeStatement(stmt);
Connection connection = jdbcSpace.getConnection();
try (Statement stmt = createDDLStatement(connection)) {
stmt.execute(ddlStmtStr);
}
return true;
} catch (SQLException sqlException) {
throw new JDBCAdapterUnexpectedException(

View File

@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
@ -73,12 +74,12 @@ public abstract class JDBCDMLOp extends JDBCOp {
}
// Only applicable to a prepared statement
protected PreparedStatement setPrepStmtValues(PreparedStatement stmt, List<Object> valList) {
protected PreparedStatement setPrepStmtValues(PreparedStatement stmt) {
assert (stmt != null);
for (int i=0; i<valList.size(); i++) {
for (int i=0; i<pStmtValList.size(); i++) {
int fieldIdx = i + 1;
Object fieldValObj = valList.get(i);
Object fieldValObj = pStmtValList.get(i);
assert (fieldValObj != null);
try {
@ -107,26 +108,24 @@ public abstract class JDBCDMLOp extends JDBCOp {
return stmt;
}
protected void processCommit() {
try {
if (!jdbcConnection.getAutoCommit()) {
jdbcConnection.commit();
protected void processCommit(Connection connection) throws SQLException {
if ( (connection!=null) && !(connection.isClosed()) ) {
if (!connection.getAutoCommit()) {
connection.commit();
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
}
} catch (SQLException e) {
throw new JDBCAdapterUnexpectedException("Failed to process JDBC statement commit!");
}
}
protected Statement createDMLStatement() {
protected Statement createDMLStatement(Connection connection) throws SQLException {
Statement stmt = jdbcStmtTL.get();
try {
if (stmt == null) {
if (stmt == null) {
if ( (connection!=null) && !(connection.isClosed()) ) {
if (isPreparedStmt)
stmt = jdbcConnection.prepareStatement(pStmtSqlStr);
stmt = connection.prepareStatement(pStmtSqlStr);
else
stmt = jdbcConnection.createStatement();
stmt = connection.createStatement();
jdbcStmtTL.set(stmt);
@ -137,11 +136,8 @@ public abstract class JDBCDMLOp extends JDBCOp {
stmt);
}
}
return stmt;
} catch (SQLException e) {
throw new JDBCAdapterUnexpectedException(
"Unable to create a prepared JDBC statement");
}
return stmt;
}
}

View File

@ -41,12 +41,13 @@ public class JDBCDMLReadOp extends JDBCDMLOp {
@Override
public Object apply(long value) {
Statement stmt = super.createDMLStatement();
if (isPreparedStmt) {
stmt = super.setPrepStmtValues((PreparedStatement) stmt, this.pStmtValList);
}
try {
Connection connection = super.jdbcSpace.getConnection();
Statement stmt = super.createDMLStatement(connection);
if (isPreparedStmt) {
stmt = setPrepStmtValues((PreparedStatement) stmt);
}
try {
// key string list to be used in the "Vector" relevancy score verification
List<String> verifierValueList = new ArrayList<>();
@ -59,11 +60,10 @@ public class JDBCDMLReadOp extends JDBCDMLOp {
verifierValueList.add(keyVal);
}
} while (rs.next());
closeStatement(stmt);
}
else {
boolean isResultSet = ((PreparedStatement)stmt).execute();
super.processCommit();
super.processCommit(connection);
while(true) {
if(isResultSet) {
@ -82,8 +82,6 @@ public class JDBCDMLReadOp extends JDBCDMLOp {
}
isResultSet = stmt.getMoreResults();
}
closeStatement(stmt);
}
return verifierValueList;

View File

@ -20,6 +20,7 @@ import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
@ -44,24 +45,23 @@ public class JDBCDMLWriteOp extends JDBCDMLOp {
@Override
public Object apply(long value) {
int trackingCnt = threadBatchTrackingCntTL.get();
trackingCnt = trackingCnt + 1;
trackingCnt++;
threadBatchTrackingCntTL.set(trackingCnt);
PreparedStatement stmt = (PreparedStatement) super.createDMLStatement();
stmt = super.setPrepStmtValues(stmt, this.pStmtValList);
try {
assert (isPreparedStmt);
Connection connection = super.jdbcSpace.getConnection();
PreparedStatement stmt = (PreparedStatement) super.createDMLStatement(connection);
stmt = super.setPrepStmtValues(stmt);
// No batch
if (ddlStmtBatchNum == 1) {
int result_cnt = stmt.executeUpdate();
super.processCommit();
closeStatement(stmt);
super.processCommit(connection);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[single ddl - execution] cycle:{}, result_cnt: {}, stmt: {}",
value, result_cnt, stmt);
}
return result_cnt;
}
// Use batch
@ -71,12 +71,12 @@ public class JDBCDMLWriteOp extends JDBCDMLOp {
LOGGER.debug("[batch ddl - adding to batch] cycle:{}, stmt: {}",
value, stmt);
}
if ( (trackingCnt % ddlStmtBatchNum == 0) || jdbcSpace.isShuttingDown() ) {
// NOTE: if the total number of cycles is not the multiple of the batch number,
// some records in a batch may not be written to the database
// To avoid this, make sure the total cycle number is the multiple of the batch number
if (trackingCnt % ddlStmtBatchNum == 0) {
int[] counts = stmt.executeBatch();
processCommit();
closeStatement(stmt);
processCommit(connection);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("[batch ddl - execution] cycle:{}, total_batch_res_cnt:{}, stmt: {}",
value, counts, stmt);
@ -88,6 +88,10 @@ public class JDBCDMLWriteOp extends JDBCDMLOp {
}
}
catch (SQLException sqlException) {
LOGGER.info("pStmtSqlStr={}", pStmtSqlStr);
LOGGER.info("pStmtValList={}", pStmtValList);
LOGGER.info("value:{},trackingCnt:{}",value,trackingCnt);
throw new JDBCAdapterUnexpectedException(
"Failed to execute the prepared DDL statement: \"" + pStmtSqlStr + "\", " +
"with values: \"" + pStmtValList + "\"");

View File

@ -33,19 +33,8 @@ public abstract class JDBCOp implements CycleOp {
"Executed the JDBC statement & committed the connection successfully";
protected final JDBCSpace jdbcSpace;
protected final Connection jdbcConnection;
public JDBCOp(JDBCSpace jdbcSpace) {
this.jdbcSpace = jdbcSpace;
String curThreadName = Thread.currentThread().getName();
this.jdbcConnection = this.jdbcSpace.getConnection(curThreadName);
}
protected void closeStatement(Statement stmt) throws SQLException {
if (! (stmt instanceof PreparedStatement)) {
stmt.close();
} else if (jdbcSpace.isShuttingDown()) {
stmt.close();
}
}
}