mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Instrumentation: Improve instrumentation of server lock service (#55516)
Improve log and trace instrumentation of the server lock service. Closes #55455
This commit is contained in:
parent
2d38664fe6
commit
08d352a0ad
@ -6,12 +6,15 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ProvideService(sqlStore *sqlstore.SQLStore) *ServerLockService {
|
func ProvideService(sqlStore *sqlstore.SQLStore, tracer tracing.Tracer) *ServerLockService {
|
||||||
return &ServerLockService{
|
return &ServerLockService{
|
||||||
SQLStore: sqlStore,
|
SQLStore: sqlStore,
|
||||||
|
tracer: tracer,
|
||||||
log: log.New("infra.lockservice"),
|
log: log.New("infra.lockservice"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -21,6 +24,7 @@ func ProvideService(sqlStore *sqlstore.SQLStore) *ServerLockService {
|
|||||||
// them up (ie, use the same actionName for both of them).
|
// them up (ie, use the same actionName for both of them).
|
||||||
type ServerLockService struct {
|
type ServerLockService struct {
|
||||||
SQLStore *sqlstore.SQLStore
|
SQLStore *sqlstore.SQLStore
|
||||||
|
tracer tracing.Tracer
|
||||||
log log.Logger
|
log log.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,9 +32,18 @@ type ServerLockService struct {
|
|||||||
// `fn` function when successful. This should not be used at low internal. But services
|
// `fn` function when successful. This should not be used at low internal. But services
|
||||||
// that needs to be run once every ex 10m.
|
// that needs to be run once every ex 10m.
|
||||||
func (sl *ServerLockService) LockAndExecute(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error {
|
func (sl *ServerLockService) LockAndExecute(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error {
|
||||||
|
start := time.Now()
|
||||||
|
ctx, span := sl.tracer.Start(ctx, "ServerLockService.LockAndExecute")
|
||||||
|
span.SetAttributes("serverlock.actionName", actionName, attribute.Key("serverlock.actionName").String(actionName))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
ctxLogger := sl.log.FromContext(ctx)
|
||||||
|
ctxLogger.Debug("Start LockAndExecute", "actionName", actionName)
|
||||||
|
|
||||||
// gets or creates a lockable row
|
// gets or creates a lockable row
|
||||||
rowLock, err := sl.getOrCreate(ctx, actionName)
|
rowLock, err := sl.getOrCreate(ctx, actionName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -39,20 +52,25 @@ func (sl *ServerLockService) LockAndExecute(ctx context.Context, actionName stri
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// try to get lock based on rowLow version
|
// try to get lock based on rowLock version
|
||||||
acquiredLock, err := sl.acquireLock(ctx, rowLock)
|
acquiredLock, err := sl.acquireLock(ctx, rowLock)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if acquiredLock {
|
if acquiredLock {
|
||||||
fn(ctx)
|
sl.executeFunc(ctx, actionName, fn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctxLogger.Debug("LockAndExecute finished", "actionName", actionName, "acquiredLock", acquiredLock, "duration", time.Since(start))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock) (bool, error) {
|
func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock) (bool, error) {
|
||||||
|
ctx, span := sl.tracer.Start(ctx, "ServerLockService.acquireLock")
|
||||||
|
defer span.End()
|
||||||
var result bool
|
var result bool
|
||||||
|
|
||||||
err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||||
@ -78,6 +96,9 @@ func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *server
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string) (*serverLock, error) {
|
func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string) (*serverLock, error) {
|
||||||
|
ctx, span := sl.tracer.Start(ctx, "ServerLockService.getOrCreate")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
var result *serverLock
|
var result *serverLock
|
||||||
|
|
||||||
err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||||
@ -115,26 +136,40 @@ func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string)
|
|||||||
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
|
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
|
||||||
// that is impossible for 2 processes to run at the same time.
|
// that is impossible for 2 processes to run at the same time.
|
||||||
func (sl *ServerLockService) LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error {
|
func (sl *ServerLockService) LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error {
|
||||||
|
start := time.Now()
|
||||||
|
ctx, span := sl.tracer.Start(ctx, "ServerLockService.LockExecuteAndRelease")
|
||||||
|
span.SetAttributes("serverlock.actionName", actionName, attribute.Key("serverlock.actionName").String(actionName))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
ctxLogger := sl.log.FromContext(ctx)
|
||||||
|
ctxLogger.Debug("Start LockExecuteAndRelease", "actionName", actionName)
|
||||||
|
|
||||||
err := sl.acquireForRelease(ctx, actionName, maxInterval)
|
err := sl.acquireForRelease(ctx, actionName, maxInterval)
|
||||||
// could not get the lock, returning
|
// could not get the lock, returning
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// execute!
|
sl.executeFunc(ctx, actionName, fn)
|
||||||
fn(ctx)
|
|
||||||
|
|
||||||
err = sl.releaseLock(ctx, actionName)
|
err = sl.releaseLock(ctx, actionName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sl.log.Error("Error releasing the lock.", err)
|
span.RecordError(err)
|
||||||
|
ctxLogger.Error("Failed to release the lock", "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctxLogger.Debug("LockExecuteAndRelease finished", "actionName", actionName, "duration", time.Since(start))
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// acquireForRelease will check if the lock is already on the database, if it is, will check with maxInterval if it is
|
// acquireForRelease will check if the lock is already on the database, if it is, will check with maxInterval if it is
|
||||||
// timeouted. Returns nil error if the lock was acquired correctly
|
// timeouted. Returns nil error if the lock was acquired correctly
|
||||||
func (sl *ServerLockService) acquireForRelease(ctx context.Context, actionName string, maxInterval time.Duration) error {
|
func (sl *ServerLockService) acquireForRelease(ctx context.Context, actionName string, maxInterval time.Duration) error {
|
||||||
|
ctx, span := sl.tracer.Start(ctx, "ServerLockService.acquireForRelease")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
// getting the lock - as the action name has a Unique constraint, this will fail if the lock is already on the database
|
// getting the lock - as the action name has a Unique constraint, this will fail if the lock is already on the database
|
||||||
err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||||
// we need to find if the lock is in the database
|
// we need to find if the lock is in the database
|
||||||
@ -144,6 +179,8 @@ func (sl *ServerLockService) acquireForRelease(ctx context.Context, actionName s
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ctxLogger := sl.log.FromContext(ctx)
|
||||||
|
|
||||||
if len(lockRows) > 0 {
|
if len(lockRows) > 0 {
|
||||||
result := lockRows[0]
|
result := lockRows[0]
|
||||||
if sl.isLockWithinInterval(result, maxInterval) {
|
if sl.isLockWithinInterval(result, maxInterval) {
|
||||||
@ -157,7 +194,7 @@ func (sl *ServerLockService) acquireForRelease(ctx context.Context, actionName s
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if affected != 1 {
|
if affected != 1 {
|
||||||
sl.log.Error("Expected rows affected to be 1 if there was no error.", "actionName", actionName, "rowAffected", affected)
|
ctxLogger.Error("Expected rows affected to be 1 if there was no error", "actionName", actionName, "rowsAffected", affected)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -175,7 +212,7 @@ func (sl *ServerLockService) acquireForRelease(ctx context.Context, actionName s
|
|||||||
|
|
||||||
if affected != 1 {
|
if affected != 1 {
|
||||||
// this means that there was no error but there is something not working correctly
|
// this means that there was no error but there is something not working correctly
|
||||||
sl.log.Error("Expected rows affected to be 1 if there was no error.", "actionName", actionName, "rowAffected", affected)
|
ctxLogger.Error("Expected rows affected to be 1 if there was no error", "actionName", actionName, "rowsAffected", affected)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -186,6 +223,9 @@ func (sl *ServerLockService) acquireForRelease(ctx context.Context, actionName s
|
|||||||
// releaseLock will delete the row at the database. This is only intended to be used within the scope of LockExecuteAndRelease
|
// releaseLock will delete the row at the database. This is only intended to be used within the scope of LockExecuteAndRelease
|
||||||
// method, but not as to manually release a Lock
|
// method, but not as to manually release a Lock
|
||||||
func (sl *ServerLockService) releaseLock(ctx context.Context, actionName string) error {
|
func (sl *ServerLockService) releaseLock(ctx context.Context, actionName string) error {
|
||||||
|
ctx, span := sl.tracer.Start(ctx, "ServerLockService.releaseLock")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
err := sl.SQLStore.WithDbSession(ctx, func(dbSession *sqlstore.DBSession) error {
|
||||||
sql := `DELETE FROM server_lock WHERE operation_uid=? `
|
sql := `DELETE FROM server_lock WHERE operation_uid=? `
|
||||||
|
|
||||||
@ -195,7 +235,7 @@ func (sl *ServerLockService) releaseLock(ctx context.Context, actionName string)
|
|||||||
}
|
}
|
||||||
affected, err := res.RowsAffected()
|
affected, err := res.RowsAffected()
|
||||||
if affected != 1 {
|
if affected != 1 {
|
||||||
sl.log.Debug("Error releasing lock ", "actionName", actionName, "affected", affected)
|
sl.log.FromContext(ctx).Debug("Error releasing lock", "actionName", actionName, "rowsAffected", affected)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
@ -212,3 +252,16 @@ func (sl *ServerLockService) isLockWithinInterval(lock *serverLock, maxInterval
|
|||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (sl ServerLockService) executeFunc(ctx context.Context, actionName string, fn func(ctx context.Context)) {
|
||||||
|
start := time.Now()
|
||||||
|
ctx, span := sl.tracer.Start(ctx, "ServerLockService.executeFunc")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
|
ctxLogger := sl.log.FromContext(ctx)
|
||||||
|
ctxLogger.Debug("Start execution", "actionName", actionName)
|
||||||
|
|
||||||
|
fn(ctx)
|
||||||
|
|
||||||
|
ctxLogger.Debug("Execution finished", "actionName", actionName, "duration", time.Since(start))
|
||||||
|
}
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
"github.com/grafana/grafana/pkg/services/sqlstore"
|
"github.com/grafana/grafana/pkg/services/sqlstore"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -19,6 +20,7 @@ func createTestableServerLock(t *testing.T) *ServerLockService {
|
|||||||
|
|
||||||
return &ServerLockService{
|
return &ServerLockService{
|
||||||
SQLStore: store,
|
SQLStore: store,
|
||||||
|
tracer: tracing.InitializeTracerForTest(),
|
||||||
log: log.New("test-logger"),
|
log: log.New("test-logger"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user