mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
SQLStore: Optionally retry queries if sqlite returns database is locked (#56096)
* SQLStore: Retry queries if sqlite returns database is locked * Configurable retries * Apply suggestions from code review Co-authored-by: Christopher Moyer <35463610+chri2547@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
920d2aa599
commit
46fb4081ba
@@ -128,6 +128,12 @@ cache_mode = private
|
|||||||
# For "mysql" only if migrationLocking feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0.
|
# For "mysql" only if migrationLocking feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0.
|
||||||
locking_attempt_timeout_sec = 0
|
locking_attempt_timeout_sec = 0
|
||||||
|
|
||||||
|
# For "sqlite" only. How many times to retry query in case of database is locked failures. Default is 0 (disabled).
|
||||||
|
query_retries = 0
|
||||||
|
|
||||||
|
# For "sqlite" only. How many times to retry transaction in case of database is locked failures. Default is 5.
|
||||||
|
transaction_retries = 5
|
||||||
|
|
||||||
#################################### Cache server #############################
|
#################################### Cache server #############################
|
||||||
[remote_cache]
|
[remote_cache]
|
||||||
# Either "redis", "memcached" or "database" default is "database"
|
# Either "redis", "memcached" or "database" default is "database"
|
||||||
|
|||||||
@@ -130,6 +130,12 @@
|
|||||||
# For "mysql" only if migrationLocking feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0.
|
# For "mysql" only if migrationLocking feature toggle is set. How many seconds to wait before failing to lock the database for the migrations, default is 0.
|
||||||
;locking_attempt_timeout_sec = 0
|
;locking_attempt_timeout_sec = 0
|
||||||
|
|
||||||
|
# For "sqlite" only. How many times to retry query in case of database is locked failures. Default is 0 (disabled).
|
||||||
|
;query_retries = 0
|
||||||
|
|
||||||
|
# For "sqlite" only. How many times to retry transaction in case of database is locked failures. Default is 5.
|
||||||
|
;transaction_retries = 5
|
||||||
|
|
||||||
################################### Data sources #########################
|
################################### Data sources #########################
|
||||||
[datasources]
|
[datasources]
|
||||||
# Upper limit of data sources that Grafana will return. This limit is a temporary configuration and it will be deprecated when pagination will be introduced on the list data sources API.
|
# Upper limit of data sources that Grafana will return. This limit is a temporary configuration and it will be deprecated when pagination will be introduced on the list data sources API.
|
||||||
|
|||||||
@@ -365,6 +365,14 @@ will be stored.
|
|||||||
For "sqlite3" only. [Shared cache](https://www.sqlite.org/sharedcache.html) setting used for connecting to the database. (private, shared)
|
For "sqlite3" only. [Shared cache](https://www.sqlite.org/sharedcache.html) setting used for connecting to the database. (private, shared)
|
||||||
Defaults to `private`.
|
Defaults to `private`.
|
||||||
|
|
||||||
|
### query_retries
|
||||||
|
|
||||||
|
This setting applies to `sqlite` only and controls the number of times the system retries a query when the database is locked. The default value is `0` (disabled).
|
||||||
|
|
||||||
|
### transaction_retries
|
||||||
|
|
||||||
|
This setting applies to `sqlite` only and controls the number of times the system retries a transaction when the database is locked. The default value is `5`.
|
||||||
|
|
||||||
<hr />
|
<hr />
|
||||||
|
|
||||||
## [remote_cache]
|
## [remote_cache]
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ func (ss *SQLStore) createOrg(ctx context.Context, name string, userID int64, en
|
|||||||
Created: time.Now(),
|
Created: time.Now(),
|
||||||
Updated: time.Now(),
|
Updated: time.Now(),
|
||||||
}
|
}
|
||||||
if err := inTransactionWithRetryCtx(ctx, engine, ss.bus, func(sess *DBSession) error {
|
if err := ss.inTransactionWithRetryCtx(ctx, engine, ss.bus, func(sess *DBSession) error {
|
||||||
if isNameTaken, err := isOrgNameTaken(name, 0, sess); err != nil {
|
if isNameTaken, err := isOrgNameTaken(name, 0, sess); err != nil {
|
||||||
return err
|
return err
|
||||||
} else if isNameTaken {
|
} else if isNameTaken {
|
||||||
|
|||||||
@@ -2,11 +2,14 @@ package sqlstore
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
"xorm.io/xorm"
|
"xorm.io/xorm"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/mattn/go-sqlite3"
|
||||||
)
|
)
|
||||||
|
|
||||||
var sessionLogger = log.New("sqlstore.session")
|
var sessionLogger = log.New("sqlstore.session")
|
||||||
@@ -55,18 +58,37 @@ func startSessionOrUseExisting(ctx context.Context, engine *xorm.Engine, beginTr
|
|||||||
// WithDbSession calls the callback with the session in the context (if exists).
|
// WithDbSession calls the callback with the session in the context (if exists).
|
||||||
// Otherwise it creates a new one that is closed upon completion.
|
// Otherwise it creates a new one that is closed upon completion.
|
||||||
// A session is stored in the context if sqlstore.InTransaction() has been been previously called with the same context (and it's not committed/rolledback yet).
|
// A session is stored in the context if sqlstore.InTransaction() has been been previously called with the same context (and it's not committed/rolledback yet).
|
||||||
|
// In case of sqlite3.ErrLocked or sqlite3.ErrBusy failure it will be retried at most five times before giving up.
|
||||||
func (ss *SQLStore) WithDbSession(ctx context.Context, callback DBTransactionFunc) error {
|
func (ss *SQLStore) WithDbSession(ctx context.Context, callback DBTransactionFunc) error {
|
||||||
return withDbSession(ctx, ss.engine, callback)
|
return ss.withDbSession(ctx, ss.engine, callback)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNewDbSession calls the callback with a new session that is closed upon completion.
|
// WithNewDbSession calls the callback with a new session that is closed upon completion.
|
||||||
|
// In case of sqlite3.ErrLocked or sqlite3.ErrBusy failure it will be retried at most five times before giving up.
|
||||||
func (ss *SQLStore) WithNewDbSession(ctx context.Context, callback DBTransactionFunc) error {
|
func (ss *SQLStore) WithNewDbSession(ctx context.Context, callback DBTransactionFunc) error {
|
||||||
sess := &DBSession{Session: ss.engine.NewSession(), transactionOpen: false}
|
sess := &DBSession{Session: ss.engine.NewSession(), transactionOpen: false}
|
||||||
defer sess.Close()
|
defer sess.Close()
|
||||||
return callback(sess)
|
return ss.withRetry(ctx, callback, 0)(sess)
|
||||||
}
|
}
|
||||||
|
|
||||||
func withDbSession(ctx context.Context, engine *xorm.Engine, callback DBTransactionFunc) error {
|
func (ss *SQLStore) withRetry(ctx context.Context, callback DBTransactionFunc, retry int) DBTransactionFunc {
|
||||||
|
return func(sess *DBSession) error {
|
||||||
|
err := callback(sess)
|
||||||
|
|
||||||
|
ctxLogger := tsclogger.FromContext(ctx)
|
||||||
|
|
||||||
|
var sqlError sqlite3.Error
|
||||||
|
if errors.As(err, &sqlError) && retry < ss.dbCfg.QueryRetries && (sqlError.Code == sqlite3.ErrLocked || sqlError.Code == sqlite3.ErrBusy) {
|
||||||
|
time.Sleep(time.Millisecond * time.Duration(10))
|
||||||
|
ctxLogger.Info("Database locked, sleeping then retrying", "error", err, "retry", retry, "code", sqlError.Code)
|
||||||
|
return ss.withRetry(ctx, callback, retry+1)(sess)
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *SQLStore) withDbSession(ctx context.Context, engine *xorm.Engine, callback DBTransactionFunc) error {
|
||||||
sess, isNew, err := startSessionOrUseExisting(ctx, engine, false)
|
sess, isNew, err := startSessionOrUseExisting(ctx, engine, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -74,7 +96,7 @@ func withDbSession(ctx context.Context, engine *xorm.Engine, callback DBTransact
|
|||||||
if isNew {
|
if isNew {
|
||||||
defer sess.Close()
|
defer sess.Close()
|
||||||
}
|
}
|
||||||
return callback(sess)
|
return ss.withRetry(ctx, callback, 0)(sess)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sess *DBSession) InsertId(bean interface{}) (int64, error) {
|
func (sess *DBSession) InsertId(bean interface{}) (int64, error) {
|
||||||
|
|||||||
62
pkg/services/sqlstore/session_test.go
Normal file
62
pkg/services/sqlstore/session_test.go
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
package sqlstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/mattn/go-sqlite3"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestRetryingOnFailures(t *testing.T) {
|
||||||
|
store := InitTestDB(t)
|
||||||
|
store.dbCfg.QueryRetries = 5
|
||||||
|
|
||||||
|
funcToTest := map[string]func(ctx context.Context, callback DBTransactionFunc) error{
|
||||||
|
"WithDbSession()": store.WithDbSession,
|
||||||
|
"WithNewDbSession()": store.WithNewDbSession,
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, f := range funcToTest {
|
||||||
|
i := 0
|
||||||
|
t.Run(fmt.Sprintf("%s should return the error immediately if it's other than sqlite3.ErrLocked or sqlite3.ErrBusy", name), func(t *testing.T) {
|
||||||
|
err := f(context.Background(), func(sess *DBSession) error {
|
||||||
|
i++
|
||||||
|
return errors.New("some error")
|
||||||
|
})
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Equal(t, i, 1)
|
||||||
|
})
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
t.Run(fmt.Sprintf("%s should return the sqlite3.Error if all retries have failed", name), func(t *testing.T) {
|
||||||
|
err := f(context.Background(), func(sess *DBSession) error {
|
||||||
|
i++
|
||||||
|
return sqlite3.Error{Code: sqlite3.ErrBusy}
|
||||||
|
})
|
||||||
|
require.Error(t, err)
|
||||||
|
var driverErr sqlite3.Error
|
||||||
|
require.ErrorAs(t, err, &driverErr)
|
||||||
|
require.Equal(t, i, store.dbCfg.QueryRetries+1)
|
||||||
|
})
|
||||||
|
|
||||||
|
i = 0
|
||||||
|
t.Run(fmt.Sprintf("%s should not return the error if successive retries succeed", name), func(t *testing.T) {
|
||||||
|
err := f(context.Background(), func(sess *DBSession) error {
|
||||||
|
var err error
|
||||||
|
switch {
|
||||||
|
case i >= store.dbCfg.QueryRetries:
|
||||||
|
err = nil
|
||||||
|
default:
|
||||||
|
err = sqlite3.Error{Code: sqlite3.ErrBusy}
|
||||||
|
}
|
||||||
|
i++
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, i, store.dbCfg.QueryRetries+1)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -455,6 +455,9 @@ func (ss *SQLStore) readConfig() error {
|
|||||||
ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
|
ss.dbCfg.CacheMode = sec.Key("cache_mode").MustString("private")
|
||||||
ss.dbCfg.SkipMigrations = sec.Key("skip_migrations").MustBool()
|
ss.dbCfg.SkipMigrations = sec.Key("skip_migrations").MustBool()
|
||||||
ss.dbCfg.MigrationLockAttemptTimeout = sec.Key("locking_attempt_timeout_sec").MustInt()
|
ss.dbCfg.MigrationLockAttemptTimeout = sec.Key("locking_attempt_timeout_sec").MustInt()
|
||||||
|
|
||||||
|
ss.dbCfg.QueryRetries = sec.Key("query_retries").MustInt()
|
||||||
|
ss.dbCfg.TransactionRetries = sec.Key("transaction_retries").MustInt(5)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -671,4 +674,8 @@ type DatabaseConfig struct {
|
|||||||
UrlQueryParams map[string][]string
|
UrlQueryParams map[string][]string
|
||||||
SkipMigrations bool
|
SkipMigrations bool
|
||||||
MigrationLockAttemptTimeout int
|
MigrationLockAttemptTimeout int
|
||||||
|
// SQLite only
|
||||||
|
QueryRetries int
|
||||||
|
// SQLite only
|
||||||
|
TransactionRetries int
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ var tsclogger = log.New("sqlstore.transactions")
|
|||||||
|
|
||||||
// WithTransactionalDbSession calls the callback with a session within a transaction.
|
// WithTransactionalDbSession calls the callback with a session within a transaction.
|
||||||
func (ss *SQLStore) WithTransactionalDbSession(ctx context.Context, callback DBTransactionFunc) error {
|
func (ss *SQLStore) WithTransactionalDbSession(ctx context.Context, callback DBTransactionFunc) error {
|
||||||
return inTransactionWithRetryCtx(ctx, ss.engine, ss.bus, callback, 0)
|
return ss.inTransactionWithRetryCtx(ctx, ss.engine, ss.bus, callback, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
// InTransaction starts a transaction and calls the fn
|
// InTransaction starts a transaction and calls the fn
|
||||||
@@ -27,13 +27,13 @@ func (ss *SQLStore) InTransaction(ctx context.Context, fn func(ctx context.Conte
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SQLStore) inTransactionWithRetry(ctx context.Context, fn func(ctx context.Context) error, retry int) error {
|
func (ss *SQLStore) inTransactionWithRetry(ctx context.Context, fn func(ctx context.Context) error, retry int) error {
|
||||||
return inTransactionWithRetryCtx(ctx, ss.engine, ss.bus, func(sess *DBSession) error {
|
return ss.inTransactionWithRetryCtx(ctx, ss.engine, ss.bus, func(sess *DBSession) error {
|
||||||
withValue := context.WithValue(ctx, ContextSessionKey{}, sess)
|
withValue := context.WithValue(ctx, ContextSessionKey{}, sess)
|
||||||
return fn(withValue)
|
return fn(withValue)
|
||||||
}, retry)
|
}, retry)
|
||||||
}
|
}
|
||||||
|
|
||||||
func inTransactionWithRetryCtx(ctx context.Context, engine *xorm.Engine, bus bus.Bus, callback DBTransactionFunc, retry int) error {
|
func (ss *SQLStore) inTransactionWithRetryCtx(ctx context.Context, engine *xorm.Engine, bus bus.Bus, callback DBTransactionFunc, retry int) error {
|
||||||
sess, isNew, err := startSessionOrUseExisting(ctx, engine, true)
|
sess, isNew, err := startSessionOrUseExisting(ctx, engine, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -60,14 +60,14 @@ func inTransactionWithRetryCtx(ctx context.Context, engine *xorm.Engine, bus bus
|
|||||||
|
|
||||||
// special handling of database locked errors for sqlite, then we can retry 5 times
|
// special handling of database locked errors for sqlite, then we can retry 5 times
|
||||||
var sqlError sqlite3.Error
|
var sqlError sqlite3.Error
|
||||||
if errors.As(err, &sqlError) && retry < 5 && (sqlError.Code == sqlite3.ErrLocked || sqlError.Code == sqlite3.ErrBusy) {
|
if errors.As(err, &sqlError) && retry < ss.dbCfg.TransactionRetries && (sqlError.Code == sqlite3.ErrLocked || sqlError.Code == sqlite3.ErrBusy) {
|
||||||
if rollErr := sess.Rollback(); rollErr != nil {
|
if rollErr := sess.Rollback(); rollErr != nil {
|
||||||
return fmt.Errorf("rolling back transaction due to error failed: %s: %w", rollErr, err)
|
return fmt.Errorf("rolling back transaction due to error failed: %s: %w", rollErr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
time.Sleep(time.Millisecond * time.Duration(10))
|
time.Sleep(time.Millisecond * time.Duration(10))
|
||||||
ctxLogger.Info("Database locked, sleeping then retrying", "error", err, "retry", retry)
|
ctxLogger.Info("Database locked, sleeping then retrying", "error", err, "retry", retry, "code", sqlError.Code)
|
||||||
return inTransactionWithRetryCtx(ctx, engine, bus, callback, retry+1)
|
return ss.inTransactionWithRetryCtx(ctx, engine, bus, callback, retry+1)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user