diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 2972c9c7614..18248d6667e 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -12,8 +12,8 @@ type Msg interface{} var ErrHandlerNotFound = errors.New("handler not found") -type TransactionWrapper interface { - Wrap(ctx context.Context, fn func(ctx context.Context) error) error +type TransactionManager interface { + InTransaction(ctx context.Context, fn func(ctx context.Context) error) error } type Bus interface { @@ -35,19 +35,18 @@ type Bus interface { // SetTransactionManager allows the user to replace the internal // noop TransactionManager that is responsible for manageing // transactions in `InTransaction` - SetTransactionManager(tm TransactionWrapper) + SetTransactionManager(tm TransactionManager) } func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { - return b.transactionWrapper.Wrap(ctx, fn) + return b.txMng.InTransaction(ctx, fn) } type InProcBus struct { handlers map[string]HandlerFunc listeners map[string][]HandlerFunc wildcardListeners []HandlerFunc - - transactionWrapper TransactionWrapper + txMng TransactionManager } // temp stuff, not sure how to handle bus instance, and init yet @@ -58,8 +57,7 @@ func New() Bus { bus.handlers = make(map[string]HandlerFunc) bus.listeners = make(map[string][]HandlerFunc) bus.wildcardListeners = make([]HandlerFunc, 0) - - bus.transactionWrapper = &noopTransactionManager{} + bus.txMng = &noopTransactionManager{} return bus } @@ -69,12 +67,8 @@ func GetBus() Bus { return globalBus } -func SetTransactionManager(tm TransactionWrapper) { - globalBus.SetTransactionManager(tm) -} - -func (b *InProcBus) SetTransactionManager(tm TransactionWrapper) { - b.transactionWrapper = tm +func (b *InProcBus) SetTransactionManager(tm TransactionManager) { + b.txMng = tm } func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { @@ -213,6 +207,6 @@ func ClearBusHandlers() { type noopTransactionManager struct{} -func (*noopTransactionManager) Wrap(ctx context.Context, fn func(ctx context.Context) error) error { +func (*noopTransactionManager) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { return nil } diff --git a/pkg/models/transaction.go b/pkg/models/transaction.go new file mode 100644 index 00000000000..e07b2a9e397 --- /dev/null +++ b/pkg/models/transaction.go @@ -0,0 +1,7 @@ +package models + +import "context" + +type TransactionManager interface { + InTransaction(ctx context.Context, fn func(ctx context.Context) error) error +} diff --git a/pkg/services/sqlstore/session.go b/pkg/services/sqlstore/session.go new file mode 100644 index 00000000000..307d3ee1eeb --- /dev/null +++ b/pkg/services/sqlstore/session.go @@ -0,0 +1,63 @@ +package sqlstore + +import ( + "context" + "reflect" + + "github.com/go-xorm/xorm" +) + +type DBSession struct { + *xorm.Session + events []interface{} +} + +type dbTransactionFunc func(sess *DBSession) error + +func (sess *DBSession) publishAfterCommit(msg interface{}) { + sess.events = append(sess.events, msg) +} + +func newSession() *DBSession { + return &DBSession{Session: x.NewSession()} +} + +func startSession(ctx context.Context) *DBSession { + value := ctx.Value(ContextSessionName) + var sess *DBSession + sess, ok := value.(*DBSession) + + if !ok { + newSess := newSession() + newSess.Begin() + return newSess + } + + return sess +} + +func withDbSession(ctx context.Context, callback dbTransactionFunc) error { + sess := startSession(ctx) + + return callback(sess) +} + +func (sess *DBSession) InsertId(bean interface{}) (int64, error) { + table := sess.DB().Mapper.Obj2Table(getTypeName(bean)) + + dialect.PreInsertId(table, sess.Session) + + id, err := sess.Session.InsertOne(bean) + + dialect.PostInsertId(table, sess.Session) + + return id, err +} + +func getTypeName(bean interface{}) (res string) { + t := reflect.TypeOf(bean) + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t.Name() +} diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index a36e6cb15d1..f97134fd0d5 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -23,11 +23,10 @@ import ( "github.com/go-sql-driver/mysql" "github.com/go-xorm/xorm" - _ "github.com/lib/pq" - _ "github.com/mattn/go-sqlite3" - sqlite3 "github.com/mattn/go-sqlite3" _ "github.com/grafana/grafana/pkg/tsdb/mssql" + _ "github.com/lib/pq" + _ "github.com/mattn/go-sqlite3" ) var ( @@ -82,9 +81,7 @@ func (ss *SqlStore) Init() error { // Init repo instances annotations.SetRepository(&SqlAnnotationRepo{}) - ss.Bus.SetTransactionManager(&SQLTransactionManager{ - engine: ss.engine, - }) + ss.Bus.SetTransactionManager(ss) // ensure admin user if ss.skipEnsureAdmin { @@ -94,57 +91,10 @@ func (ss *SqlStore) Init() error { return ss.ensureAdminUser() } -// SQLTransactionManager begin/end transaction -type SQLTransactionManager struct { - engine *xorm.Engine -} - -func (stm *SQLTransactionManager) Wrap(ctx context.Context, fn func(ctx context.Context) error) error { - return stm.wrapInternal(ctx, fn, 0) -} - -func (stm *SQLTransactionManager) wrapInternal(ctx context.Context, fn func(ctx context.Context) error, retry int) error { - sess := startSession(ctx) - defer sess.Close() - - withValue := context.WithValue(ctx, ContextSessionName, sess) - - err := fn(withValue) - - // special handling of database locked errors for sqlite, then we can retry 3 times - if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 { - if sqlError.Code == sqlite3.ErrLocked { - sess.Rollback() - time.Sleep(time.Millisecond * time.Duration(10)) - sqlog.Info("Database table locked, sleeping then retrying", "retry", retry) - return stm.wrapInternal(ctx, fn, retry+1) - } - } - - if err != nil { - sess.Rollback() - return err - } - - if err = sess.Commit(); err != nil { - return err - } - - if len(sess.events) > 0 { - for _, e := range sess.events { - if err = bus.Publish(e); err != nil { - log.Error(3, "Failed to publish event after commit", err) - } - } - } - - return nil -} - func (ss *SqlStore) ensureAdminUser() error { systemUserCountQuery := m.GetSystemUserCountStatsQuery{} - err := bus.InTransaction(context.Background(), func(ctx context.Context) error { + err := ss.InTransaction(context.Background(), func(ctx context.Context) error { err := bus.DispatchCtx(ctx, &systemUserCountQuery) if err != nil { diff --git a/pkg/services/sqlstore/shared.go b/pkg/services/sqlstore/transactions.go similarity index 56% rename from pkg/services/sqlstore/shared.go rename to pkg/services/sqlstore/transactions.go index 7928f22b9f3..959d21c0bf1 100644 --- a/pkg/services/sqlstore/shared.go +++ b/pkg/services/sqlstore/transactions.go @@ -2,52 +2,53 @@ package sqlstore import ( "context" - "reflect" "time" - "github.com/go-xorm/xorm" "github.com/grafana/grafana/pkg/bus" "github.com/grafana/grafana/pkg/log" sqlite3 "github.com/mattn/go-sqlite3" ) -type DBSession struct { - *xorm.Session - events []interface{} +func (ss *SqlStore) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { + return ss.inTransactionWithRetry(ctx, fn, 0) } -type dbTransactionFunc func(sess *DBSession) error +func (ss *SqlStore) inTransactionWithRetry(ctx context.Context, fn func(ctx context.Context) error, retry int) error { + sess := startSession(ctx) + defer sess.Close() -func (sess *DBSession) publishAfterCommit(msg interface{}) { - sess.events = append(sess.events, msg) -} + withValue := context.WithValue(ctx, ContextSessionName, sess) -func newSession() *DBSession { - return &DBSession{Session: x.NewSession()} -} + err := fn(withValue) -func inTransaction(callback dbTransactionFunc) error { - return inTransactionWithRetry(callback, 0) -} - -func startSession(ctx context.Context) *DBSession { - value := ctx.Value(ContextSessionName) - var sess *DBSession - sess, ok := value.(*DBSession) - - if !ok { - newSess := newSession() - newSess.Begin() - return newSess + // special handling of database locked errors for sqlite, then we can retry 3 times + if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 { + if sqlError.Code == sqlite3.ErrLocked { + sess.Rollback() + time.Sleep(time.Millisecond * time.Duration(10)) + ss.log.Info("Database table locked, sleeping then retrying", "retry", retry) + return ss.inTransactionWithRetry(ctx, fn, retry+1) + } } - return sess -} + if err != nil { + sess.Rollback() + return err + } -func withDbSession(ctx context.Context, callback dbTransactionFunc) error { - sess := startSession(ctx) + if err = sess.Commit(); err != nil { + return err + } - return callback(sess) + if len(sess.events) > 0 { + for _, e := range sess.events { + if err = bus.Publish(e); err != nil { + ss.log.Error("Failed to publish event after commit", err) + } + } + } + + return nil } func inTransactionWithRetry(callback dbTransactionFunc, retry int) error { @@ -94,22 +95,6 @@ func inTransactionWithRetryCtx(ctx context.Context, callback dbTransactionFunc, return nil } -func (sess *DBSession) InsertId(bean interface{}) (int64, error) { - table := sess.DB().Mapper.Obj2Table(getTypeName(bean)) - - dialect.PreInsertId(table, sess.Session) - - id, err := sess.Session.InsertOne(bean) - - dialect.PostInsertId(table, sess.Session) - - return id, err -} - -func getTypeName(bean interface{}) (res string) { - t := reflect.TypeOf(bean) - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t.Name() +func inTransaction(callback dbTransactionFunc) error { + return inTransactionWithRetry(callback, 0) } diff --git a/pkg/services/sqlstore/user.go b/pkg/services/sqlstore/user.go index befc3a08401..f01cb84ad4f 100644 --- a/pkg/services/sqlstore/user.go +++ b/pkg/services/sqlstore/user.go @@ -31,7 +31,6 @@ func init() { bus.AddHandler("sql", DeleteUser) bus.AddHandler("sql", UpdateUserPermissions) bus.AddHandler("sql", SetUserHelpFlag) - bus.AddCtxHandler("sql", CreateUserCtx) }