mirror of
https://github.com/nosqlbench/nosqlbench.git
synced 2025-02-25 18:55:28 -06:00
Refactor JDBCActivity to use NBErrorHandler
This commit is contained in:
@@ -17,7 +17,7 @@ import org.apache.logging.log4j.Logger;
|
||||
|
||||
import javax.sql.DataSource;
|
||||
import java.sql.SQLException;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Function;
|
||||
|
||||
// This should not be exposed as as service directly unless it can
|
||||
// be used with a modular JDBC configuration.
|
||||
@@ -27,8 +27,7 @@ public abstract class JDBCActivity extends SimpleActivity {
|
||||
private Timer resultTimer;
|
||||
private Timer resultSuccessTimer;
|
||||
private Histogram triesHisto;
|
||||
private ExceptionCountMetrics exceptionCount;
|
||||
private SQLExceptionCountMetrics sqlExceptionCount;
|
||||
private int maxTries;
|
||||
|
||||
protected DataSource dataSource;
|
||||
protected OpSequence<OpDispenser<String>> opSequence;
|
||||
@@ -47,6 +46,8 @@ public abstract class JDBCActivity extends SimpleActivity {
|
||||
public synchronized void onActivityDefUpdate(ActivityDef activityDef) {
|
||||
super.onActivityDefUpdate(activityDef);
|
||||
|
||||
this.maxTries = getParams().getOptionalInteger("maxtries").orElse(3);
|
||||
|
||||
LOGGER.debug("initializing data source");
|
||||
dataSource = newDataSource();
|
||||
|
||||
@@ -70,8 +71,6 @@ public abstract class JDBCActivity extends SimpleActivity {
|
||||
resultTimer = ActivityMetrics.timer(getActivityDef(), "result");
|
||||
resultSuccessTimer = ActivityMetrics.timer(getActivityDef(), "result-success");
|
||||
triesHisto = ActivityMetrics.histogram(getActivityDef(), "tries");
|
||||
exceptionCount = new ExceptionCountMetrics(getActivityDef());
|
||||
sqlExceptionCount = new SQLExceptionCountMetrics(getActivityDef());
|
||||
|
||||
opSequence = createOpSequence(ReadyJDBCOp::new);
|
||||
setDefaultsFromOpSequence(opSequence);
|
||||
@@ -79,12 +78,20 @@ public abstract class JDBCActivity extends SimpleActivity {
|
||||
onActivityDefUpdate(getActivityDef());
|
||||
}
|
||||
|
||||
public int getMaxTries() {
|
||||
return 3;
|
||||
public String errorNameMapper(Throwable e) {
|
||||
if (e instanceof SQLException) {
|
||||
return ((SQLException) e).getSQLState();
|
||||
}
|
||||
return e.getClass().getSimpleName();
|
||||
}
|
||||
|
||||
public boolean isRetryable(SQLException sqlException) {
|
||||
return true;
|
||||
@Override
|
||||
public Function<Throwable, String> getErrorNameMapper() {
|
||||
return this::errorNameMapper;
|
||||
}
|
||||
|
||||
public int getMaxTries() {
|
||||
return this.maxTries;
|
||||
}
|
||||
|
||||
public DataSource getDataSource() {
|
||||
@@ -110,29 +117,4 @@ public abstract class JDBCActivity extends SimpleActivity {
|
||||
public Histogram getTriesHisto() {
|
||||
return triesHisto;
|
||||
}
|
||||
|
||||
public ExceptionCountMetrics getExceptionCount() {
|
||||
return exceptionCount;
|
||||
}
|
||||
|
||||
public SQLExceptionCountMetrics getSQLExceptionCount() {
|
||||
return sqlExceptionCount;
|
||||
}
|
||||
|
||||
public static class SQLExceptionCountMetrics {
|
||||
private final ConcurrentHashMap<Integer, Counter> counters = new ConcurrentHashMap<>();
|
||||
private final ActivityDef activityDef;
|
||||
|
||||
private SQLExceptionCountMetrics(ActivityDef activityDef) {
|
||||
this.activityDef = activityDef;
|
||||
}
|
||||
|
||||
public void inc(SQLException e) {
|
||||
Counter c = counters.computeIfAbsent(
|
||||
e.getErrorCode(),
|
||||
k -> ActivityMetrics.counter(activityDef, "errorcodecounts." + k)
|
||||
);
|
||||
c.inc();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,13 +3,13 @@ package io.nosqlbench.activitytype.jdbc.impl;
|
||||
import com.codahale.metrics.Timer;
|
||||
import io.nosqlbench.activitytype.jdbc.api.JDBCActivity;
|
||||
import io.nosqlbench.engine.api.activityapi.core.SyncAction;
|
||||
import io.nosqlbench.engine.api.activityapi.errorhandling.modular.ErrorDetail;
|
||||
import io.nosqlbench.engine.api.activityapi.planning.OpSequence;
|
||||
import io.nosqlbench.engine.api.activityimpl.OpDispenser;
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.LongFunction;
|
||||
@@ -40,59 +40,37 @@ public class JDBCAction implements SyncAction {
|
||||
}
|
||||
|
||||
int maxTries = activity.getMaxTries();
|
||||
int errorCode = 0;
|
||||
|
||||
for (int tries = 1; tries <= maxTries; tries++) {
|
||||
errorCode = execute(boundStmt, tries);
|
||||
if (errorCode == 0) return 0;
|
||||
}
|
||||
Exception error = null;
|
||||
long startTimeNanos = System.nanoTime();
|
||||
|
||||
LOGGER.debug("Max tries " + maxTries + " exceeded for executing statement " + boundStmt);
|
||||
return errorCode;
|
||||
}
|
||||
try (Connection conn = activity.getDataSource().getConnection()) {
|
||||
Statement jdbcStmt = conn.createStatement();
|
||||
jdbcStmt.execute(boundStmt);
|
||||
|
||||
private int execute(String sql, int tries) {
|
||||
long startTimeNano = System.nanoTime();
|
||||
Long resultTime = null;
|
||||
|
||||
try (Connection conn = activity.getDataSource().getConnection()) {
|
||||
Statement jdbcStmt = conn.createStatement();
|
||||
jdbcStmt.execute(sql);
|
||||
|
||||
resultTime = System.nanoTime() - startTimeNano;
|
||||
activity.getResultSuccessTimer().update(resultTime, TimeUnit.NANOSECONDS);
|
||||
|
||||
} catch (Exception e) {
|
||||
LOGGER.debug("Try " + tries + ": failed to execute statement: " + sql, e);
|
||||
|
||||
activity.getExceptionCount().count(e.getClass().getSimpleName());
|
||||
|
||||
if (e instanceof SQLException) {
|
||||
SQLException sqle = (SQLException) e;
|
||||
|
||||
activity.getSQLExceptionCount().inc(sqle);
|
||||
|
||||
// TODO non-retryable exception should return its non-zero error code to runCycle() caller
|
||||
if (!activity.isRetryable(sqle)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return sqle.getErrorCode();
|
||||
} catch (Exception e) {
|
||||
error = e;
|
||||
}
|
||||
|
||||
return 1;
|
||||
long executionTimeNanos = System.nanoTime() - startTimeNanos;
|
||||
|
||||
} finally {
|
||||
if (resultTime == null) {
|
||||
resultTime = System.nanoTime() - startTimeNano;
|
||||
}
|
||||
|
||||
activity.getResultTimer().update(resultTime, TimeUnit.NANOSECONDS);
|
||||
activity.getResultTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
|
||||
activity.getTriesHisto().update(tries);
|
||||
|
||||
if (error == null) {
|
||||
activity.getResultSuccessTimer().update(executionTimeNanos, TimeUnit.NANOSECONDS);
|
||||
return 0;
|
||||
} else {
|
||||
ErrorDetail detail = activity.getErrorHandler().handleError(error, cycle, executionTimeNanos);
|
||||
if (!detail.isRetryable()) {
|
||||
LOGGER.debug("Exit failure after non-retryable error");
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.trace("Try " + tries + ": successfully executed statement: " + sql);
|
||||
return 0;
|
||||
LOGGER.debug("Exit failure after maxretries=" + maxTries);
|
||||
return 1;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user