From 263572813a493ac34618075bedee5c0bac92df19 Mon Sep 17 00:00:00 2001 From: bergquist Date: Thu, 7 Jun 2018 18:02:28 +0200 Subject: [PATCH] replace begin/end with wrapper function --- pkg/bus/bus.go | 40 +++++++++++++--------------- pkg/services/sqlstore/shared.go | 13 +++++---- pkg/services/sqlstore/sqlstore.go | 44 ++++++++++++++++++++----------- 3 files changed, 52 insertions(+), 45 deletions(-) diff --git a/pkg/bus/bus.go b/pkg/bus/bus.go index 98279678777..0f10dfd9b17 100644 --- a/pkg/bus/bus.go +++ b/pkg/bus/bus.go @@ -12,9 +12,8 @@ type Msg interface{} var ErrHandlerNotFound = errors.New("handler not found") -type TransactionManager interface { - Begin(ctx context.Context) (context.Context, error) - End(ctx context.Context, err error) error +type TransactionWrapper interface { + Wrapp(ctx context.Context, fn func(ctx context.Context) error) error } type Bus interface { @@ -25,7 +24,7 @@ type Bus interface { // InTransaction starts a transaction and store it in the context. // The caller can then pass a function with multiple DispatchCtx calls that // all will be executed in the same transaction. InTransaction will rollback if the - // callback returns an error.s + // callback returns an error. InTransaction(ctx context.Context, fn func(ctx context.Context) error) error AddHandler(handler HandlerFunc) @@ -36,19 +35,11 @@ type Bus interface { // SetTransactionManager allows the user to replace the internal // noop TransactionManager that is responsible for manageing // transactions in `InTransaction` - SetTransactionManager(tm TransactionManager) + SetTransactionManager(tm TransactionWrapper) } func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { - ctxWithTran, err := b.transactionManager.Begin(ctx) - if err != nil { - return err - } - - err = fn(ctxWithTran) - b.transactionManager.End(ctxWithTran, err) - - return err + return b.transactionWrapper.Wrapp(ctx, fn) } type InProcBus struct { @@ -56,7 +47,7 @@ type InProcBus struct { listeners map[string][]HandlerFunc wildcardListeners []HandlerFunc - transactionManager TransactionManager + transactionWrapper TransactionWrapper } // temp stuff, not sure how to handle bus instance, and init yet @@ -68,7 +59,7 @@ func New() Bus { bus.listeners = make(map[string][]HandlerFunc) bus.wildcardListeners = make([]HandlerFunc, 0) - bus.transactionManager = &NoopTransactionManager{} + bus.transactionWrapper = &noopTransactionManager{} return bus } @@ -78,12 +69,12 @@ func GetBus() Bus { return globalBus } -func SetTransactionManager(tm TransactionManager) { +func SetTransactionManager(tm TransactionWrapper) { globalBus.SetTransactionManager(tm) } -func (b *InProcBus) SetTransactionManager(tm TransactionManager) { - b.transactionManager = tm +func (b *InProcBus) SetTransactionManager(tm TransactionWrapper) { + b.transactionWrapper = tm } func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error { @@ -208,6 +199,10 @@ func Publish(msg Msg) error { return globalBus.Publish(msg) } +// InTransaction starts a transaction and store it in the context. +// The caller can then pass a function with multiple DispatchCtx calls that +// all will be executed in the same transaction. InTransaction will rollback if the +// callback returns an error. func InTransaction(ctx context.Context, fn func(ctx context.Context) error) error { return globalBus.InTransaction(ctx, fn) } @@ -216,7 +211,8 @@ func ClearBusHandlers() { globalBus = New() } -type NoopTransactionManager struct{} +type noopTransactionManager struct{} -func (*NoopTransactionManager) Begin(ctx context.Context) (context.Context, error) { return ctx, nil } -func (*NoopTransactionManager) End(ctx context.Context, err error) error { return err } +func (*noopTransactionManager) Wrapp(ctx context.Context, fn func(ctx context.Context) error) error { + return nil +} diff --git a/pkg/services/sqlstore/shared.go b/pkg/services/sqlstore/shared.go index 3ccb92f010f..7928f22b9f3 100644 --- a/pkg/services/sqlstore/shared.go +++ b/pkg/services/sqlstore/shared.go @@ -32,17 +32,16 @@ func inTransaction(callback dbTransactionFunc) error { func startSession(ctx context.Context) *DBSession { value := ctx.Value(ContextSessionName) - var sess *xorm.Session - sess, ok := value.(*xorm.Session) + var sess *DBSession + sess, ok := value.(*DBSession) if !ok { - return newSession() + newSess := newSession() + newSess.Begin() + return newSess } - old := newSession() - old.Session = sess - - return old + return sess } func withDbSession(ctx context.Context, callback dbTransactionFunc) error { diff --git a/pkg/services/sqlstore/sqlstore.go b/pkg/services/sqlstore/sqlstore.go index bfe462f4d91..d6268f3a2aa 100644 --- a/pkg/services/sqlstore/sqlstore.go +++ b/pkg/services/sqlstore/sqlstore.go @@ -2,7 +2,6 @@ package sqlstore import ( "context" - "errors" "fmt" "net/url" "os" @@ -26,6 +25,7 @@ import ( "github.com/go-xorm/xorm" _ "github.com/lib/pq" _ "github.com/mattn/go-sqlite3" + sqlite3 "github.com/mattn/go-sqlite3" _ "github.com/grafana/grafana/pkg/tsdb/mssql" ) @@ -94,37 +94,49 @@ func (ss *SqlStore) Init() error { return ss.ensureAdminUser() } +// SQLTransactionManager begin/end transaction type SQLTransactionManager struct { engine *xorm.Engine } -func (stm *SQLTransactionManager) Begin(ctx context.Context) (context.Context, error) { - sess := stm.engine.NewSession() - err := sess.Begin() - if err != nil { - return ctx, err - } +func (stm *SQLTransactionManager) Wrapp(ctx context.Context, fn func(ctx context.Context) error) error { + return stm.wrappInternal(ctx, fn, 0) +} + +func (stm *SQLTransactionManager) wrappInternal(ctx context.Context, fn func(ctx context.Context) error, retry int) error { + sess := startSession(ctx) + defer sess.Close() withValue := context.WithValue(ctx, ContextSessionName, sess) - return withValue, nil -} + err := fn(withValue) -func (stm *SQLTransactionManager) End(ctx context.Context, err error) error { - value := ctx.Value(ContextSessionName) - sess, ok := value.(*xorm.Session) - if !ok { - return errors.New("context is missing transaction") + // special handling of database locked errors for sqlite, then we can retry 3 times + if sqlError, ok := err.(sqlite3.Error); ok && retry < 5 { + if sqlError.Code == sqlite3.ErrLocked { + sess.Rollback() + time.Sleep(time.Millisecond * time.Duration(10)) + sqlog.Info("Database table locked, sleeping then retrying", "retry", retry) + return stm.wrappInternal(ctx, fn, retry+1) + } } if err != nil { sess.Rollback() return err + } else if err = sess.Commit(); err != nil { + return err } - defer sess.Close() + if len(sess.events) > 0 { + for _, e := range sess.events { + if err = bus.Publish(e); err != nil { + log.Error(3, "Failed to publish event after commit", err) + } + } + } - return sess.Commit() + return nil } func (ss *SqlStore) ensureAdminUser() error {