mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
JDBC code temp change - 20131020 10:00am
This commit is contained in:
parent
ce60423ea6
commit
ddd4ce5426
@ -38,6 +38,7 @@ import java.sql.Connection;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
public class JDBCSpace implements AutoCloseable {
|
||||
private final static Logger logger = LogManager.getLogger(JDBCSpace.class);
|
||||
@ -59,15 +60,17 @@ public class JDBCSpace implements AutoCloseable {
|
||||
private final int totalThreadNum;
|
||||
private HikariConfig hikariConfig;
|
||||
private HikariDataSource hikariDataSource;
|
||||
private final Random random;
|
||||
|
||||
ConcurrentHashMap<String, Connection> connections = new ConcurrentHashMap<>();
|
||||
// Maintain a client-side pooling just to make sure the allocated connections can
|
||||
// be reclaimed quickly, instead of waiting for Hikari pooling to reclaim it eventually
|
||||
public record ConnectionCacheKey(String connName) {
|
||||
}
|
||||
private final ConcurrentHashMap<ConnectionCacheKey, Connection> connections = new ConcurrentHashMap<>();
|
||||
|
||||
public JDBCSpace(String spaceName, NBConfiguration cfg) {
|
||||
this.spaceName = spaceName;
|
||||
this.totalCycleNum = NumberUtils.toLong(cfg.getOptional("cycles").orElse("1"));
|
||||
this.totalThreadNum = NumberUtils.toInt(cfg.getOptional("threads").orElse("1"));
|
||||
this.random = new Random();
|
||||
|
||||
// Must be after the 'maxNumConn' statements and before the rest of the remaining statements!
|
||||
this.initializeSpace(cfg);
|
||||
@ -107,28 +110,8 @@ public class JDBCSpace implements AutoCloseable {
|
||||
return this.hikariDataSource;
|
||||
}
|
||||
|
||||
public Connection getConnection() {
|
||||
int rnd = random.nextInt(0, getMaxNumConn());
|
||||
final String connectionName = "jdbc-conn-" + rnd;
|
||||
|
||||
return connections.computeIfAbsent(connectionName, key -> {
|
||||
try {
|
||||
Connection connection = hikariDataSource.getConnection();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("JDBC connection ({}) is successfully created: {}",
|
||||
connectionName, connection);
|
||||
}
|
||||
// Register 'vector' type
|
||||
JDBCPgVector.addVectorType(connection);
|
||||
|
||||
return connection;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
String exp = "Exception occurred while attempting to create a connection using the HikariDataSource";
|
||||
logger.error(exp, ex);
|
||||
throw new JDBCAdapterUnexpectedException(exp);
|
||||
}
|
||||
});
|
||||
public Connection getConnection(ConnectionCacheKey key, Supplier<Connection> connectionSupplier) {
|
||||
return connections.computeIfAbsent(key, __ -> connectionSupplier.get());
|
||||
}
|
||||
|
||||
private void initializeSpace(NBConfiguration cfg) {
|
||||
@ -187,11 +170,11 @@ public class JDBCSpace implements AutoCloseable {
|
||||
hikariConfig.addDataSourceProperty("applicationName", cfg.get("applicationName"));
|
||||
hikariConfig.addDataSourceProperty("rewriteBatchedInserts", cfg.getOrDefault("rewriteBatchedInserts", true));
|
||||
|
||||
// We're managing the auto-commit behavior of connections ourselves and hence disabling the auto-commit.
|
||||
Optional<String> autoCommitOpt = cfg.getOptional("autoCommit");
|
||||
boolean autoCommit = false;
|
||||
if (autoCommitOpt.isPresent()) autoCommit = BooleanUtils.toBoolean(autoCommitOpt.get());
|
||||
hikariConfig.setAutoCommit(autoCommit);
|
||||
if (autoCommitOpt.isPresent()) {
|
||||
boolean autoCommit = autoCommit = BooleanUtils.toBoolean(autoCommitOpt.get());
|
||||
hikariConfig.setAutoCommit(autoCommit);
|
||||
}
|
||||
|
||||
hikariConfig.setKeepaliveTime(Integer.parseInt(cfg.get("keepaliveTime")));
|
||||
// Use the NB "num_conn" parameter instead, wth 20% extra capacity
|
||||
@ -204,7 +187,7 @@ public class JDBCSpace implements AutoCloseable {
|
||||
|
||||
private void shutdownSpace() {
|
||||
try {
|
||||
logger.info("Total {} of connections is being closed ...", connections.size());
|
||||
logger.info("Shutting down JDBCSpace -- total {} of connections is being closed ...", connections.size());
|
||||
for (Connection connection : connections.values()) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Close connection : {}", connection);
|
||||
|
@ -41,6 +41,8 @@ public class JDBCDDLOpDispenser extends JDBCBaseOpDispenser {
|
||||
this.isDdlStatement = true;
|
||||
this.ddlSqlStrFunc = sqlStrFunc;
|
||||
|
||||
// For DDL statements, must use autoCommit
|
||||
assert(jdbcSpace.getHikariDataSource().isAutoCommit());
|
||||
if (isPreparedStatement) {
|
||||
throw new JDBCAdapterInvalidParamException("DDL statements can NOT be prepared!");
|
||||
}
|
||||
|
@ -33,16 +33,11 @@ public class JDBCDDLOp extends JDBCOp {
|
||||
this.ddlStmtStr = ddlStmtStr;
|
||||
}
|
||||
|
||||
private Statement createDDLStatement(Connection connection) throws SQLException {
|
||||
return connection.createStatement();
|
||||
}
|
||||
@Override
|
||||
public Object apply(long value) {
|
||||
try {
|
||||
Connection connection = jdbcSpace.getConnection();
|
||||
try (Statement stmt = createDDLStatement(connection)) {
|
||||
stmt.execute(ddlStmtStr);
|
||||
}
|
||||
Statement stmt = jdbcConnection.createStatement();
|
||||
stmt.execute(ddlStmtStr);
|
||||
return true;
|
||||
} catch (SQLException sqlException) {
|
||||
throw new JDBCAdapterUnexpectedException(
|
||||
|
@ -18,13 +18,11 @@ package io.nosqlbench.adapter.jdbc.optypes;
|
||||
import io.nosqlbench.adapter.jdbc.JDBCSpace;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterInvalidParamException;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCPgVectorException;
|
||||
import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
@ -74,7 +72,7 @@ public abstract class JDBCDMLOp extends JDBCOp {
|
||||
}
|
||||
|
||||
// Only applicable to a prepared statement
|
||||
protected PreparedStatement setPrepStmtValues(PreparedStatement stmt) {
|
||||
protected PreparedStatement setPrepStmtValues(PreparedStatement stmt) throws SQLException {
|
||||
assert (stmt != null);
|
||||
|
||||
for (int i=0; i<pStmtValList.size(); i++) {
|
||||
@ -99,8 +97,8 @@ public abstract class JDBCDMLOp extends JDBCOp {
|
||||
}
|
||||
stmt.setObject(fieldIdx, fieldValObj);
|
||||
}
|
||||
catch (JDBCPgVectorException | SQLException e) {
|
||||
throw new RuntimeException(
|
||||
catch ( SQLException e) {
|
||||
throw new SQLException(
|
||||
"Failed to parse the prepared statement value for field[" + fieldIdx + "] " + fieldValObj);
|
||||
}
|
||||
}
|
||||
@ -108,36 +106,31 @@ public abstract class JDBCDMLOp extends JDBCOp {
|
||||
return stmt;
|
||||
}
|
||||
|
||||
protected void processCommit(Connection connection) throws SQLException {
|
||||
if ( (connection!=null) && !(connection.isClosed()) ) {
|
||||
if (!connection.getAutoCommit()) {
|
||||
connection.commit();
|
||||
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
|
||||
}
|
||||
|
||||
protected void processCommit() throws SQLException {
|
||||
if (!jdbcConnection.getAutoCommit()) {
|
||||
jdbcConnection.commit();
|
||||
LOGGER.debug(() -> LOG_COMMIT_SUCCESS);
|
||||
}
|
||||
}
|
||||
|
||||
protected Statement createDMLStatement(Connection connection) throws SQLException {
|
||||
protected Statement createDMLStatement() throws SQLException {
|
||||
Statement stmt = jdbcStmtTL.get();
|
||||
|
||||
if (stmt == null) {
|
||||
if ( (connection!=null) && !(connection.isClosed()) ) {
|
||||
if (isPreparedStmt)
|
||||
stmt = connection.prepareStatement(pStmtSqlStr);
|
||||
else
|
||||
stmt = connection.createStatement();
|
||||
if (isPreparedStmt)
|
||||
stmt = jdbcConnection.prepareStatement(pStmtSqlStr);
|
||||
else
|
||||
stmt = jdbcConnection.createStatement();
|
||||
|
||||
jdbcStmtTL.set(stmt);
|
||||
jdbcStmtTL.set(stmt);
|
||||
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("A statement is created -- prepared: {}, read/write: {}, stmt: {}",
|
||||
isPreparedStmt,
|
||||
isReadStmt ? "read" : "write",
|
||||
stmt);
|
||||
}
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("A statement is created -- prepared: {}, read/write: {}, stmt: {}",
|
||||
isPreparedStmt,
|
||||
isReadStmt ? "read" : "write",
|
||||
stmt);
|
||||
}
|
||||
}
|
||||
|
||||
return stmt;
|
||||
}
|
||||
}
|
||||
|
@ -42,8 +42,7 @@ public class JDBCDMLReadOp extends JDBCDMLOp {
|
||||
@Override
|
||||
public Object apply(long value) {
|
||||
try {
|
||||
Connection connection = super.jdbcSpace.getConnection();
|
||||
Statement stmt = super.createDMLStatement(connection);
|
||||
Statement stmt = super.createDMLStatement();
|
||||
if (isPreparedStmt) {
|
||||
stmt = setPrepStmtValues((PreparedStatement) stmt);
|
||||
}
|
||||
@ -63,7 +62,7 @@ public class JDBCDMLReadOp extends JDBCDMLOp {
|
||||
}
|
||||
else {
|
||||
boolean isResultSet = ((PreparedStatement)stmt).execute();
|
||||
super.processCommit(connection);
|
||||
super.processCommit();
|
||||
|
||||
while(true) {
|
||||
if(isResultSet) {
|
||||
|
@ -50,14 +50,13 @@ public class JDBCDMLWriteOp extends JDBCDMLOp {
|
||||
|
||||
try {
|
||||
assert (isPreparedStmt);
|
||||
Connection connection = super.jdbcSpace.getConnection();
|
||||
PreparedStatement stmt = (PreparedStatement) super.createDMLStatement(connection);
|
||||
PreparedStatement stmt = (PreparedStatement) super.createDMLStatement();
|
||||
stmt = super.setPrepStmtValues(stmt);
|
||||
|
||||
// No batch
|
||||
if (ddlStmtBatchNum == 1) {
|
||||
int result_cnt = stmt.executeUpdate();
|
||||
super.processCommit(connection);
|
||||
super.processCommit();
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("[single ddl - execution] cycle:{}, result_cnt: {}, stmt: {}",
|
||||
value, result_cnt, stmt);
|
||||
@ -76,7 +75,7 @@ public class JDBCDMLWriteOp extends JDBCDMLOp {
|
||||
// To avoid this, make sure the total cycle number is the multiple of the batch number
|
||||
if (trackingCnt % ddlStmtBatchNum == 0) {
|
||||
int[] counts = stmt.executeBatch();
|
||||
processCommit(connection);
|
||||
processCommit();
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("[batch ddl - execution] cycle:{}, total_batch_res_cnt:{}, stmt: {}",
|
||||
value, counts, stmt);
|
||||
@ -88,10 +87,6 @@ public class JDBCDMLWriteOp extends JDBCDMLOp {
|
||||
}
|
||||
}
|
||||
catch (SQLException sqlException) {
|
||||
LOGGER.info("pStmtSqlStr={}", pStmtSqlStr);
|
||||
LOGGER.info("pStmtValList={}", pStmtValList);
|
||||
LOGGER.info("value:{},trackingCnt:{}",value,trackingCnt);
|
||||
|
||||
throw new JDBCAdapterUnexpectedException(
|
||||
"Failed to execute the prepared DDL statement: \"" + pStmtSqlStr + "\", " +
|
||||
"with values: \"" + pStmtValList + "\"");
|
||||
|
@ -18,6 +18,7 @@ package io.nosqlbench.adapter.jdbc.optypes;
|
||||
|
||||
import io.nosqlbench.adapter.jdbc.JDBCSpace;
|
||||
import io.nosqlbench.adapter.jdbc.exceptions.JDBCAdapterUnexpectedException;
|
||||
import io.nosqlbench.adapter.jdbc.utils.JDBCPgVector;
|
||||
import io.nosqlbench.adapters.api.activityimpl.uniform.flowtypes.CycleOp;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
@ -26,6 +27,7 @@ import java.sql.Connection;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.Random;
|
||||
|
||||
public abstract class JDBCOp implements CycleOp {
|
||||
private static final Logger LOGGER = LogManager.getLogger(JDBCOp.class);
|
||||
@ -33,8 +35,36 @@ public abstract class JDBCOp implements CycleOp {
|
||||
"Executed the JDBC statement & committed the connection successfully";
|
||||
|
||||
protected final JDBCSpace jdbcSpace;
|
||||
protected final Connection jdbcConnection;
|
||||
private final Random random = new Random();
|
||||
|
||||
public JDBCOp(JDBCSpace jdbcSpace) {
|
||||
this.jdbcSpace = jdbcSpace;
|
||||
this.jdbcConnection = getConnection();
|
||||
}
|
||||
|
||||
private Connection getConnection() {
|
||||
int rnd = random.nextInt(0, jdbcSpace.getMaxNumConn());
|
||||
final String connectionName = "jdbc-conn-" + rnd;
|
||||
|
||||
return jdbcSpace.getConnection(
|
||||
new JDBCSpace.ConnectionCacheKey(connectionName), () -> {
|
||||
try {
|
||||
Connection connection = jdbcSpace.getHikariDataSource().getConnection();
|
||||
if (LOGGER.isDebugEnabled()) {
|
||||
LOGGER.debug("JDBC connection ({}) is successfully created: {}",
|
||||
connectionName, connection);
|
||||
}
|
||||
// Register 'vector' type
|
||||
JDBCPgVector.addVectorType(connection);
|
||||
|
||||
return connection;
|
||||
}
|
||||
catch (Exception ex) {
|
||||
String exp = "Exception occurred while attempting to create a connection using the HikariDataSource";
|
||||
LOGGER.error(exp, ex);
|
||||
throw new JDBCAdapterUnexpectedException(exp);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user