mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
SQLStore: Refactor query retries to use exponential backoff (#58559)
This commit is contained in:
parent
a2f1d2e102
commit
f3da48bd50
@ -9,10 +9,13 @@ import (
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/util/errutil"
|
||||
"github.com/grafana/grafana/pkg/util/retryer"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
var sessionLogger = log.New("sqlstore.session")
|
||||
var ErrMaximumRetriesReached = errutil.NewBase(errutil.StatusInternal, "sqlstore.max-retries-reached")
|
||||
|
||||
type DBSession struct {
|
||||
*xorm.Session
|
||||
@ -68,23 +71,34 @@ func (ss *SQLStore) WithDbSession(ctx context.Context, callback DBTransactionFun
|
||||
func (ss *SQLStore) WithNewDbSession(ctx context.Context, callback DBTransactionFunc) error {
|
||||
sess := &DBSession{Session: ss.engine.NewSession(), transactionOpen: false}
|
||||
defer sess.Close()
|
||||
return ss.withRetry(ctx, callback, 0)(sess)
|
||||
retry := 0
|
||||
return retryer.Retry(ss.retryOnLocks(ctx, callback, sess, retry), ss.dbCfg.QueryRetries, time.Millisecond*time.Duration(10), time.Second)
|
||||
}
|
||||
|
||||
func (ss *SQLStore) withRetry(ctx context.Context, callback DBTransactionFunc, retry int) DBTransactionFunc {
|
||||
return func(sess *DBSession) error {
|
||||
func (ss *SQLStore) retryOnLocks(ctx context.Context, callback DBTransactionFunc, sess *DBSession, retry int) func() (retryer.RetrySignal, error) {
|
||||
return func() (retryer.RetrySignal, error) {
|
||||
retry++
|
||||
|
||||
err := callback(sess)
|
||||
|
||||
ctxLogger := tsclogger.FromContext(ctx)
|
||||
|
||||
var sqlError sqlite3.Error
|
||||
if errors.As(err, &sqlError) && retry < ss.dbCfg.QueryRetries && (sqlError.Code == sqlite3.ErrLocked || sqlError.Code == sqlite3.ErrBusy) {
|
||||
time.Sleep(time.Millisecond * time.Duration(10))
|
||||
if errors.As(err, &sqlError) && (sqlError.Code == sqlite3.ErrLocked || sqlError.Code == sqlite3.ErrBusy) {
|
||||
ctxLogger.Info("Database locked, sleeping then retrying", "error", err, "retry", retry, "code", sqlError.Code)
|
||||
return ss.withRetry(ctx, callback, retry+1)(sess)
|
||||
// retryer immediately returns the error (if there is one) without checking the response
|
||||
// therefore we only have to send it if we have reached the maximum retries
|
||||
if retry == ss.dbCfg.QueryRetries {
|
||||
return retryer.FuncError, ErrMaximumRetriesReached.Errorf("retry %d: %w", retry, err)
|
||||
}
|
||||
return retryer.FuncFailure, nil
|
||||
}
|
||||
|
||||
return err
|
||||
if err != nil {
|
||||
return retryer.FuncError, err
|
||||
}
|
||||
|
||||
return retryer.FuncComplete, nil
|
||||
}
|
||||
}
|
||||
|
||||
@ -96,7 +110,8 @@ func (ss *SQLStore) withDbSession(ctx context.Context, engine *xorm.Engine, call
|
||||
if isNew {
|
||||
defer sess.Close()
|
||||
}
|
||||
return ss.withRetry(ctx, callback, 0)(sess)
|
||||
retry := 0
|
||||
return retryer.Retry(ss.retryOnLocks(ctx, callback, sess, retry), ss.dbCfg.QueryRetries, time.Millisecond*time.Duration(10), time.Second)
|
||||
}
|
||||
|
||||
func (sess *DBSession) InsertId(bean interface{}) (int64, error) {
|
||||
|
@ -15,48 +15,51 @@ func TestRetryingOnFailures(t *testing.T) {
|
||||
store.dbCfg.QueryRetries = 5
|
||||
|
||||
funcToTest := map[string]func(ctx context.Context, callback DBTransactionFunc) error{
|
||||
"WithDbSession()": store.WithDbSession,
|
||||
"WithNewDbSession()": store.WithNewDbSession,
|
||||
"WithDbSession": store.WithDbSession,
|
||||
"WithNewDbSession": store.WithNewDbSession,
|
||||
}
|
||||
|
||||
for name, f := range funcToTest {
|
||||
i := 0
|
||||
t.Run(fmt.Sprintf("%s should return the error immediately if it's other than sqlite3.ErrLocked or sqlite3.ErrBusy", name), func(t *testing.T) {
|
||||
err := f(context.Background(), func(sess *DBSession) error {
|
||||
i := 0
|
||||
callback := func(sess *DBSession) error {
|
||||
i++
|
||||
return errors.New("some error")
|
||||
})
|
||||
}
|
||||
err := f(context.Background(), callback)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, i, 1)
|
||||
require.Equal(t, 1, i)
|
||||
})
|
||||
|
||||
i = 0
|
||||
t.Run(fmt.Sprintf("%s should return the sqlite3.Error if all retries have failed", name), func(t *testing.T) {
|
||||
err := f(context.Background(), func(sess *DBSession) error {
|
||||
i := 0
|
||||
callback := func(sess *DBSession) error {
|
||||
i++
|
||||
return sqlite3.Error{Code: sqlite3.ErrBusy}
|
||||
})
|
||||
}
|
||||
err := f(context.Background(), callback)
|
||||
require.Error(t, err)
|
||||
var driverErr sqlite3.Error
|
||||
require.ErrorAs(t, err, &driverErr)
|
||||
require.Equal(t, i, store.dbCfg.QueryRetries+1)
|
||||
require.Equal(t, store.dbCfg.QueryRetries, i)
|
||||
})
|
||||
|
||||
i = 0
|
||||
t.Run(fmt.Sprintf("%s should not return the error if successive retries succeed", name), func(t *testing.T) {
|
||||
err := f(context.Background(), func(sess *DBSession) error {
|
||||
i := 0
|
||||
callback := func(sess *DBSession) error {
|
||||
i++
|
||||
var err error
|
||||
switch {
|
||||
case i >= store.dbCfg.QueryRetries:
|
||||
case store.dbCfg.QueryRetries == i:
|
||||
err = nil
|
||||
default:
|
||||
err = sqlite3.Error{Code: sqlite3.ErrBusy}
|
||||
}
|
||||
i++
|
||||
return err
|
||||
})
|
||||
}
|
||||
err := f(context.Background(), callback)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, i, store.dbCfg.QueryRetries+1)
|
||||
require.Equal(t, store.dbCfg.QueryRetries, i)
|
||||
})
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user