2018-06-07 14:54:36 -05:00
package sqlstore
import (
"context"
2022-10-17 13:23:44 -05:00
"errors"
2022-11-28 09:48:44 -06:00
"fmt"
2018-06-07 14:54:36 -05:00
"reflect"
2022-10-17 13:23:44 -05:00
"time"
2018-06-07 14:54:36 -05:00
2023-01-30 02:21:27 -06:00
"github.com/mattn/go-sqlite3"
2023-01-09 08:41:15 -06:00
"go.opentelemetry.io/otel/attribute"
2020-04-01 08:57:21 -05:00
"xorm.io/xorm"
2022-02-08 08:02:23 -06:00
"github.com/grafana/grafana/pkg/infra/log"
2023-01-09 08:41:15 -06:00
"github.com/grafana/grafana/pkg/infra/tracing"
2022-11-28 09:48:44 -06:00
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
2022-11-24 05:12:50 -06:00
"github.com/grafana/grafana/pkg/util/errutil"
"github.com/grafana/grafana/pkg/util/retryer"
2018-06-07 14:54:36 -05:00
)
2022-02-08 08:02:23 -06:00
var sessionLogger = log . New ( "sqlstore.session" )
2022-11-24 05:12:50 -06:00
var ErrMaximumRetriesReached = errutil . NewBase ( errutil . StatusInternal , "sqlstore.max-retries-reached" )
2022-02-08 08:02:23 -06:00
2018-06-07 14:54:36 -05:00
type DBSession struct {
* xorm . Session
2022-02-08 08:02:23 -06:00
transactionOpen bool
events [ ] interface { }
2018-06-07 14:54:36 -05:00
}
2022-02-01 07:51:22 -06:00
type DBTransactionFunc func ( sess * DBSession ) error
2018-06-07 14:54:36 -05:00
func ( sess * DBSession ) publishAfterCommit ( msg interface { } ) {
sess . events = append ( sess . events , msg )
2022-06-28 07:32:25 -05:00
}
func ( sess * DBSession ) PublishAfterCommit ( msg interface { } ) {
sess . events = append ( sess . events , msg )
2018-06-07 14:54:36 -05:00
}
2023-01-09 08:41:15 -06:00
func startSessionOrUseExisting ( ctx context . Context , engine * xorm . Engine , beginTran bool , tracer tracing . Tracer ) ( * DBSession , bool , tracing . Span , error ) {
2020-03-23 07:37:53 -05:00
value := ctx . Value ( ContextSessionKey { } )
2018-06-07 14:54:36 -05:00
var sess * DBSession
sess , ok := value . ( * DBSession )
2018-06-18 07:50:36 -05:00
if ok {
2022-09-20 11:32:06 -05:00
ctxLogger := sessionLogger . FromContext ( ctx )
ctxLogger . Debug ( "reusing existing session" , "transaction" , sess . transactionOpen )
2021-05-27 06:55:33 -05:00
sess . Session = sess . Session . Context ( ctx )
2023-01-09 08:41:15 -06:00
return sess , false , nil , nil
2018-06-07 14:54:36 -05:00
}
2023-01-09 08:41:15 -06:00
tctx , span := tracer . Start ( ctx , "open session" )
span . SetAttributes ( "transaction" , beginTran , attribute . Key ( "transaction" ) . Bool ( beginTran ) )
2022-02-08 08:02:23 -06:00
newSess := & DBSession { Session : engine . NewSession ( ) , transactionOpen : beginTran }
2023-01-09 08:41:15 -06:00
2018-06-18 07:50:36 -05:00
if beginTran {
err := newSess . Begin ( )
if err != nil {
2023-01-09 08:41:15 -06:00
return nil , false , span , err
2018-06-18 07:50:36 -05:00
}
}
2023-01-09 08:41:15 -06:00
newSess . Session = newSess . Session . Context ( tctx )
2021-05-27 06:55:33 -05:00
2023-01-09 08:41:15 -06:00
return newSess , true , span , nil
2018-06-07 14:54:36 -05:00
}
2022-09-29 07:55:47 -05:00
// WithDbSession calls the callback with the session in the context (if exists).
// Otherwise it creates a new one that is closed upon completion.
2023-01-23 07:17:56 -06:00
// A session is stored in the context if sqlstore.InTransaction() has been previously called with the same context (and it's not committed/rolledback yet).
2022-10-17 13:23:44 -05:00
// In case of sqlite3.ErrLocked or sqlite3.ErrBusy failure it will be retried at most five times before giving up.
2022-02-01 07:51:22 -06:00
func ( ss * SQLStore ) WithDbSession ( ctx context . Context , callback DBTransactionFunc ) error {
2022-10-17 13:23:44 -05:00
return ss . withDbSession ( ctx , ss . engine , callback )
2019-05-16 05:39:59 -05:00
}
2022-09-29 07:55:47 -05:00
// WithNewDbSession calls the callback with a new session that is closed upon completion.
2022-10-17 13:23:44 -05:00
// In case of sqlite3.ErrLocked or sqlite3.ErrBusy failure it will be retried at most five times before giving up.
2022-09-29 07:55:47 -05:00
func ( ss * SQLStore ) WithNewDbSession ( ctx context . Context , callback DBTransactionFunc ) error {
sess := & DBSession { Session : ss . engine . NewSession ( ) , transactionOpen : false }
defer sess . Close ( )
2022-11-24 05:12:50 -06:00
retry := 0
return retryer . Retry ( ss . retryOnLocks ( ctx , callback , sess , retry ) , ss . dbCfg . QueryRetries , time . Millisecond * time . Duration ( 10 ) , time . Second )
2022-09-29 07:55:47 -05:00
}
2022-11-24 05:12:50 -06:00
func ( ss * SQLStore ) retryOnLocks ( ctx context . Context , callback DBTransactionFunc , sess * DBSession , retry int ) func ( ) ( retryer . RetrySignal , error ) {
return func ( ) ( retryer . RetrySignal , error ) {
retry ++
2022-10-17 13:23:44 -05:00
err := callback ( sess )
ctxLogger := tsclogger . FromContext ( ctx )
var sqlError sqlite3 . Error
2022-11-24 05:12:50 -06:00
if errors . As ( err , & sqlError ) && ( sqlError . Code == sqlite3 . ErrLocked || sqlError . Code == sqlite3 . ErrBusy ) {
2022-10-17 13:23:44 -05:00
ctxLogger . Info ( "Database locked, sleeping then retrying" , "error" , err , "retry" , retry , "code" , sqlError . Code )
2022-11-24 05:12:50 -06:00
// retryer immediately returns the error (if there is one) without checking the response
// therefore we only have to send it if we have reached the maximum retries
if retry == ss . dbCfg . QueryRetries {
return retryer . FuncError , ErrMaximumRetriesReached . Errorf ( "retry %d: %w" , retry , err )
}
return retryer . FuncFailure , nil
2022-10-17 13:23:44 -05:00
}
2022-11-24 05:12:50 -06:00
if err != nil {
return retryer . FuncError , err
}
return retryer . FuncComplete , nil
2022-10-17 13:23:44 -05:00
}
}
func ( ss * SQLStore ) withDbSession ( ctx context . Context , engine * xorm . Engine , callback DBTransactionFunc ) error {
2023-01-09 08:41:15 -06:00
sess , isNew , span , err := startSessionOrUseExisting ( ctx , engine , false , ss . tracer )
2022-02-08 08:02:23 -06:00
if err != nil {
return err
}
if isNew {
2023-01-09 08:41:15 -06:00
defer func ( ) {
if span != nil {
span . End ( )
}
sess . Close ( )
} ( )
2022-02-08 08:02:23 -06:00
}
2022-11-24 05:12:50 -06:00
retry := 0
return retryer . Retry ( ss . retryOnLocks ( ctx , callback , sess , retry ) , ss . dbCfg . QueryRetries , time . Millisecond * time . Duration ( 10 ) , time . Second )
2018-06-07 14:54:36 -05:00
}
2023-03-02 05:01:36 -06:00
func ( sess * DBSession ) InsertId ( bean interface { } , dialect migrator . Dialect ) error {
2018-06-07 14:54:36 -05:00
table := sess . DB ( ) . Mapper . Obj2Table ( getTypeName ( bean ) )
2019-10-22 07:08:18 -05:00
if err := dialect . PreInsertId ( table , sess . Session ) ; err != nil {
2023-03-02 05:01:36 -06:00
return err
2019-10-22 07:08:18 -05:00
}
2023-03-02 05:01:36 -06:00
_ , err := sess . Session . InsertOne ( bean )
2019-10-22 07:08:18 -05:00
if err != nil {
2023-03-02 05:01:36 -06:00
return err
2019-10-22 07:08:18 -05:00
}
if err := dialect . PostInsertId ( table , sess . Session ) ; err != nil {
2023-03-02 05:01:36 -06:00
return err
2019-10-22 07:08:18 -05:00
}
2018-06-07 14:54:36 -05:00
2023-03-02 05:01:36 -06:00
return nil
2018-06-07 14:54:36 -05:00
}
2022-11-28 09:48:44 -06:00
func ( sess * DBSession ) WithReturningID ( driverName string , query string , args [ ] interface { } ) ( int64 , error ) {
supported := driverName != migrator . Postgres
var id int64
if ! supported {
query = fmt . Sprintf ( "%s RETURNING id" , query )
if _ , err := sess . SQL ( query , args ... ) . Get ( & id ) ; err != nil {
return id , err
}
} else {
sqlOrArgs := append ( [ ] interface { } { query } , args ... )
res , err := sess . Exec ( sqlOrArgs ... )
if err != nil {
return id , err
}
id , err = res . LastInsertId ( )
if err != nil {
return id , err
}
}
return id , nil
}
2018-06-07 14:54:36 -05:00
func getTypeName ( bean interface { } ) ( res string ) {
t := reflect . TypeOf ( bean )
for t . Kind ( ) == reflect . Ptr {
t = t . Elem ( )
}
return t . Name ( )
}