mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Merge pull request #314 from szimmer1/cockroachdb-driver-errorhandler
Refactor JDBCActivity to use NBErrorHandler
This commit is contained in:
commit
63b0a68118
@ -7,7 +7,6 @@ import org.apache.logging.log4j.Logger;
|
|||||||
import org.postgresql.ds.PGSimpleDataSource;
|
import org.postgresql.ds.PGSimpleDataSource;
|
||||||
|
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
import java.sql.SQLException;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
|
||||||
public class CockroachActivity extends JDBCActivity {
|
public class CockroachActivity extends JDBCActivity {
|
||||||
@ -17,6 +16,14 @@ public class CockroachActivity extends JDBCActivity {
|
|||||||
super(activityDef);
|
super(activityDef);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO provide an error handler with sane defaults including
|
||||||
|
// * retry on 40001 SQL state code (CockroachDB txn retry)
|
||||||
|
// * retry (implement exponential, to avoid stampeding herd) on timeout getting connection from connection pool
|
||||||
|
//
|
||||||
|
//@Override
|
||||||
|
//public NBErrorHandler getErrorHandler() {
|
||||||
|
//}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected DataSource newDataSource() {
|
protected DataSource newDataSource() {
|
||||||
PGSimpleDataSource ds = new PGSimpleDataSource();
|
PGSimpleDataSource ds = new PGSimpleDataSource();
|
||||||
@ -26,13 +33,17 @@ public class CockroachActivity extends JDBCActivity {
|
|||||||
getOptionalString("serverName").
|
getOptionalString("serverName").
|
||||||
orElseThrow(() -> new RuntimeException("serverName parameter required"));
|
orElseThrow(() -> new RuntimeException("serverName parameter required"));
|
||||||
|
|
||||||
// portNumber, user, password are optional
|
// portNumber, databaseName, user, password are optional
|
||||||
Integer portNumber = getParams().getOptionalInteger("portNumber").orElse(26257);
|
Integer portNumber = getParams().getOptionalInteger("portNumber").orElse(26257);
|
||||||
|
String databaseName = getParams().getOptionalString("databaseName").orElse(null);
|
||||||
String user = getParams().getOptionalString("user").orElse(null);
|
String user = getParams().getOptionalString("user").orElse(null);
|
||||||
String password = getParams().getOptionalString("password").orElse(null);
|
String password = getParams().getOptionalString("password").orElse(null);
|
||||||
|
|
||||||
ds.setServerNames(new String[]{serverName});
|
ds.setServerNames(new String[]{serverName});
|
||||||
ds.setPortNumbers(new int[]{portNumber});
|
ds.setPortNumbers(new int[]{portNumber});
|
||||||
|
if (databaseName != null) {
|
||||||
|
ds.setDatabaseName(databaseName);
|
||||||
|
}
|
||||||
if (user != null) {
|
if (user != null) {
|
||||||
ds.setUser(user);
|
ds.setUser(user);
|
||||||
}
|
}
|
||||||
@ -40,17 +51,13 @@ public class CockroachActivity extends JDBCActivity {
|
|||||||
ds.setPassword(password);
|
ds.setPassword(password);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.debug("Final DataSource fields"
|
LOGGER.debug("Final DataSource fields:"
|
||||||
+ " serverNames=" + Arrays.toString(ds.getServerNames())
|
+ " serverNames=" + Arrays.toString(ds.getServerNames())
|
||||||
+ " portNumbers=" + Arrays.toString(ds.getPortNumbers())
|
+ " portNumbers=" + Arrays.toString(ds.getPortNumbers())
|
||||||
|
+ " databaseName=" + ds.getDatabaseName()
|
||||||
+ " user=" + ds.getUser()
|
+ " user=" + ds.getUser()
|
||||||
+ " password=" + ds.getPassword());
|
+ " password=" + ds.getPassword());
|
||||||
|
|
||||||
return ds;
|
return ds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean isRetryable(SQLException sqlException) {
|
|
||||||
return sqlException.getSQLState().equals("40001"); // sql state code for transaction conflict
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -10,11 +10,17 @@ the
|
|||||||
[DataSource Configuration Properties](https://jdbc.postgresql.org/documentation/81/ds-ds.html)
|
[DataSource Configuration Properties](https://jdbc.postgresql.org/documentation/81/ds-ds.html)
|
||||||
section for detailed parameter documentation.
|
section for detailed parameter documentation.
|
||||||
|
|
||||||
* **serverName** (required) - database hostname
|
* **serverName** (required) - database hostname.
|
||||||
* **portNumber** (optional) - database listen port; defaults to 26257.
|
* **databaseName** (optional) - database namespace to use; Default *null*.
|
||||||
* **user** (optional) - database account username; defaults to empty.
|
* **portNumber** (optional) - database listen port; Default *26257*.
|
||||||
* **password** (optional) - database account password; defaults to empty.
|
* **user** (optional) - database account username; Default *null*.
|
||||||
* **connectionpool** (optional) - connection pool implementation; defaults
|
* **password** (optional) - database account password; Default *null*.
|
||||||
to no connection pool. Allowed values:
|
* **connectionpool** (optional) - connection pool implementation; Default
|
||||||
|
no connection pool, in other words create a connection per statement execution.
|
||||||
|
Allowed values:
|
||||||
* *hikari* -
|
* *hikari* -
|
||||||
use [HikariCP](https://github.com/brettwooldridge/HikariCP)
|
use [HikariCP](https://github.com/brettwooldridge/HikariCP)
|
||||||
|
* **maxtries** (optional) - number of times to retry retry-able errors; Default *3*.
|
||||||
|
* **errors** (optional) - expression which specifies how to handle SQL state error codes.
|
||||||
|
Expression syntax and behavior is explained in the `error-handlers` topic. Default
|
||||||
|
*stop*, in other words exit on any error.
|
||||||
|
@ -0,0 +1,33 @@
|
|||||||
|
package io.nosqlbench.activitytype.cockroachdb;
|
||||||
|
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.ActivityDef;
|
||||||
|
import io.nosqlbench.engine.api.activityimpl.ParameterMap;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.postgresql.util.PSQLException;
|
||||||
|
import org.postgresql.util.PSQLState;
|
||||||
|
|
||||||
|
import java.net.SocketTimeoutException;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class CockroachActivityTest {
|
||||||
|
@Test
|
||||||
|
public void testErrorNameMapper() {
|
||||||
|
ActivityDef activityDef = new ActivityDef(ParameterMap.parseParams("").orElseThrow());
|
||||||
|
CockroachActivity activity = new CockroachActivity(activityDef);
|
||||||
|
|
||||||
|
// When the Throwable is a SQLException, the error name should be getSQLState()
|
||||||
|
Throwable sqlException = new SQLException("my test exception", "my-test-sql-state");
|
||||||
|
assertEquals("my-test-sql-state", activity.errorNameMapper(sqlException));
|
||||||
|
|
||||||
|
// See PSQLState to string code mapping at the Github source code website
|
||||||
|
// https://github.com/pgjdbc/pgjdbc/blob/master/pgjdbc/src/main/java/org/postgresql/util/PSQLState.java
|
||||||
|
Throwable psqlException = new PSQLException("retry transaction", PSQLState.CONNECTION_FAILURE);
|
||||||
|
assertEquals("08006", activity.errorNameMapper(psqlException));
|
||||||
|
|
||||||
|
// When Throwable is not a SQLException, the error name should be the class name
|
||||||
|
Throwable runtimeException = new SocketTimeoutException("my test runtime exception");
|
||||||
|
assertEquals("SocketTimeoutException", activity.errorNameMapper(runtimeException));
|
||||||
|
}
|
||||||
|
}
|
@ -17,7 +17,7 @@ import org.apache.logging.log4j.Logger;
|
|||||||
|
|
||||||
import javax.sql.DataSource;
|
import javax.sql.DataSource;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.function.Function;
|
||||||
|
|
||||||
// This should not be exposed as as service directly unless it can
|
// This should not be exposed as as service directly unless it can
|
||||||
// be used with a modular JDBC configuration.
|
// be used with a modular JDBC configuration.
|
||||||
@ -27,8 +27,7 @@ public abstract class JDBCActivity extends SimpleActivity {
|
|||||||
private Timer resultTimer;
|
private Timer resultTimer;
|
||||||
private Timer resultSuccessTimer;
|
private Timer resultSuccessTimer;
|
||||||
private Histogram triesHisto;
|
private Histogram triesHisto;
|
||||||
private ExceptionCountMetrics exceptionCount;
|
private int maxTries;
|
||||||
private SQLExceptionCountMetrics sqlExceptionCount;
|
|
||||||
|
|
||||||
protected DataSource dataSource;
|
protected DataSource dataSource;
|
||||||
protected OpSequence<OpDispenser<String>> opSequence;
|
protected OpSequence<OpDispenser<String>> opSequence;
|
||||||
@ -47,6 +46,8 @@ public abstract class JDBCActivity extends SimpleActivity {
|
|||||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
|
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
|
||||||
super.onActivityDefUpdate(activityDef);
|
super.onActivityDefUpdate(activityDef);
|
||||||
|
|
||||||
|
this.maxTries = getParams().getOptionalInteger("maxtries").orElse(3);
|
||||||
|
|
||||||
LOGGER.debug("initializing data source");
|
LOGGER.debug("initializing data source");
|
||||||
dataSource = newDataSource();
|
dataSource = newDataSource();
|
||||||
|
|
||||||
@ -70,8 +71,6 @@ public abstract class JDBCActivity extends SimpleActivity {
|
|||||||
resultTimer = ActivityMetrics.timer(getActivityDef(), "result");
|
resultTimer = ActivityMetrics.timer(getActivityDef(), "result");
|
||||||
resultSuccessTimer = ActivityMetrics.timer(getActivityDef(), "result-success");
|
resultSuccessTimer = ActivityMetrics.timer(getActivityDef(), "result-success");
|
||||||
triesHisto = ActivityMetrics.histogram(getActivityDef(), "tries");
|
triesHisto = ActivityMetrics.histogram(getActivityDef(), "tries");
|
||||||
exceptionCount = new ExceptionCountMetrics(getActivityDef());
|
|
||||||
sqlExceptionCount = new SQLExceptionCountMetrics(getActivityDef());
|
|
||||||
|
|
||||||
opSequence = createOpSequence(ReadyJDBCOp::new);
|
opSequence = createOpSequence(ReadyJDBCOp::new);
|
||||||
setDefaultsFromOpSequence(opSequence);
|
setDefaultsFromOpSequence(opSequence);
|
||||||
@ -79,12 +78,20 @@ public abstract class JDBCActivity extends SimpleActivity {
|
|||||||
onActivityDefUpdate(getActivityDef());
|
onActivityDefUpdate(getActivityDef());
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaxTries() {
|
public String errorNameMapper(Throwable e) {
|
||||||
return 3;
|
if (e instanceof SQLException) {
|
||||||
|
return ((SQLException) e).getSQLState();
|
||||||
|
}
|
||||||
|
return e.getClass().getSimpleName();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isRetryable(SQLException sqlException) {
|
@Override
|
||||||
return true;
|
public Function<Throwable, String> getErrorNameMapper() {
|
||||||
|
return this::errorNameMapper;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxTries() {
|
||||||
|
return this.maxTries;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DataSource getDataSource() {
|
public DataSource getDataSource() {
|
||||||
@ -110,29 +117,4 @@ public abstract class JDBCActivity extends SimpleActivity {
|
|||||||
public Histogram getTriesHisto() {
|
public Histogram getTriesHisto() {
|
||||||
return triesHisto;
|
return triesHisto;
|
||||||
}
|
}
|
||||||
|
|
||||||
public ExceptionCountMetrics getExceptionCount() {
|
|
||||||
return exceptionCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
public SQLExceptionCountMetrics getSQLExceptionCount() {
|
|
||||||
return sqlExceptionCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class SQLExceptionCountMetrics {
|
|
||||||
private final ConcurrentHashMap<Integer, Counter> counters = new ConcurrentHashMap<>();
|
|
||||||
private final ActivityDef activityDef;
|
|
||||||
|
|
||||||
private SQLExceptionCountMetrics(ActivityDef activityDef) {
|
|
||||||
this.activityDef = activityDef;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void inc(SQLException e) {
|
|
||||||
Counter c = counters.computeIfAbsent(
|
|
||||||
e.getErrorCode(),
|
|
||||||
k -> ActivityMetrics.counter(activityDef, "errorcodecounts." + k)
|
|
||||||
);
|
|
||||||
c.inc();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -3,13 +3,13 @@ package io.nosqlbench.activitytype.jdbc.impl;
|
|||||||
import com.codahale.metrics.Timer;
|
import com.codahale.metrics.Timer;
|
||||||
import io.nosqlbench.activitytype.jdbc.api.JDBCActivity;
|
import io.nosqlbench.activitytype.jdbc.api.JDBCActivity;
|
||||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||||
|
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
|
||||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||||
import org.apache.logging.log4j.LogManager;
|
import org.apache.logging.log4j.LogManager;
|
||||||
import org.apache.logging.log4j.Logger;
|
import org.apache.logging.log4j.Logger;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.SQLException;
|
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.LongFunction;
|
import java.util.function.LongFunction;
|
||||||
@ -40,59 +40,37 @@ public class JDBCAction implements SyncAction {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int maxTries = activity.getMaxTries();
|
int maxTries = activity.getMaxTries();
|
||||||
int errorCode = 0;
|
|
||||||
|
|
||||||
for (int tries = 1; tries <= maxTries; tries++) {
|
for (int tries = 1; tries <= maxTries; tries++) {
|
||||||
errorCode = execute(boundStmt, tries);
|
Exception error = null;
|
||||||
if (errorCode == 0) return 0;
|
long startTimeNanos = System.nanoTime();
|
||||||
}
|
|
||||||
|
|
||||||
LOGGER.debug("Max tries " + maxTries + " exceeded for executing statement " + boundStmt);
|
try (Connection conn = activity.getDataSource().getConnection()) {
|
||||||
return errorCode;
|
Statement jdbcStmt = conn.createStatement();
|
||||||
}
|
jdbcStmt.execute(boundStmt);
|
||||||
|
|
||||||
private int execute(String sql, int tries) {
|
} catch (Exception e) {
|
||||||
long startTimeNano = System.nanoTime();
|
error = e;
|
||||||
Long resultTime = null;
|
|
||||||
|
|
||||||
try (Connection conn = activity.getDataSource().getConnection()) {
|
|
||||||
Statement jdbcStmt = conn.createStatement();
|
|
||||||
jdbcStmt.execute(sql);
|
|
||||||
|
|
||||||
resultTime = System.nanoTime() - startTimeNano;
|
|
||||||
activity.getResultSuccessTimer().update(resultTime, TimeUnit.NANOSECONDS);
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOGGER.debug("Try " + tries + ": failed to execute statement: " + sql, e);
|
|
||||||
|
|
||||||
activity.getExceptionCount().count(e.getClass().getSimpleName());
|
|
||||||
|
|
||||||
if (e instanceof SQLException) {
|
|
||||||
SQLException sqle = (SQLException) e;
|
|
||||||
|
|
||||||
activity.getSQLExceptionCount().inc(sqle);
|
|
||||||
|
|
||||||
// TODO non-retryable exception should return its non-zero error code to runCycle() caller
|
|
||||||
if (!activity.isRetryable(sqle)) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
return sqle.getErrorCode();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 1;
|
long executionTimeNanos = System.nanoTime() - startTimeNanos;
|
||||||
|
|
||||||
} finally {
|
activity.getResultTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
|
||||||
if (resultTime == null) {
|
|
||||||
resultTime = System.nanoTime() - startTimeNano;
|
|
||||||
}
|
|
||||||
|
|
||||||
activity.getResultTimer().update(resultTime, TimeUnit.NANOSECONDS);
|
|
||||||
activity.getTriesHisto().update(tries);
|
activity.getTriesHisto().update(tries);
|
||||||
|
|
||||||
|
if (error == null) {
|
||||||
|
activity.getResultSuccessTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
ErrorDetail detail = activity.getErrorHandler().handleError(error, cycle, executionTimeNanos);
|
||||||
|
if (!detail.isRetryable()) {
|
||||||
|
LOGGER.debug("Exit failure after non-retryable error");
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
LOGGER.trace("Try " + tries + ": successfully executed statement: " + sql);
|
LOGGER.debug("Exit failure after maxretries=" + maxTries);
|
||||||
return 0;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user