mirror of
https://github.com/grafana/grafana.git
synced 2025-01-14 02:32:29 -06:00
Alerting: Fix flaky SQLITE_BUSY when migrating with provisioned dashboards (#76658)
* Alerting: Move migration from background service run to ngalert init sqlite database write contention between the migration's single transaction and dashboard provisioning's frequent commits was causing the migration to fail with SQLITE_BUSY/SQLITE_BUSY_SNAPSHOT on all retries. This is not a new issue for sqlite+grafana, but the discrepancy between the length of the transactions was causing it to be very consistent. In addition, since a failed migration has implications on the assumed correctness of the alertmanager and alert rule definition state, we cause a server shutdown on error. This can make e2e tests as well as some high-load provisioned sqlite installations flaky on startup. The correct fix for this is better transaction management across various services and is out of scope for this change as we're primarily interested in mitigating the current bout of server failures in e2e tests when using sqlite.
This commit is contained in:
parent
94fec65192
commit
c2efcdde09
@ -23,7 +23,6 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/login/authinfoservice"
|
||||
"github.com/grafana/grafana/pkg/services/loginattempt/loginattemptimpl"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/migration"
|
||||
"github.com/grafana/grafana/pkg/services/notifications"
|
||||
plugindashboardsservice "github.com/grafana/grafana/pkg/services/plugindashboards/service"
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/angulardetectorsprovider"
|
||||
@ -46,7 +45,7 @@ import (
|
||||
)
|
||||
|
||||
func ProvideBackgroundServiceRegistry(
|
||||
httpServer *api.HTTPServer, alertingMigrationService *migration.MigrationService, ng *ngalert.AlertNG, cleanup *cleanup.CleanUpService, live *live.GrafanaLive,
|
||||
httpServer *api.HTTPServer, ng *ngalert.AlertNG, cleanup *cleanup.CleanUpService, live *live.GrafanaLive,
|
||||
pushGateway *pushhttp.Gateway, notifications *notifications.NotificationService, pluginStore *pluginStore.Service,
|
||||
rendering *rendering.RenderingService, tokenService auth.UserTokenBackgroundService, tracing *tracing.TracingService,
|
||||
provisioning *provisioning.ProvisioningServiceImpl, alerting *alerting.AlertEngine, usageStats *uss.UsageStats,
|
||||
@ -68,7 +67,6 @@ func ProvideBackgroundServiceRegistry(
|
||||
) *BackgroundServiceRegistry {
|
||||
return NewBackgroundServiceRegistry(
|
||||
httpServer,
|
||||
alertingMigrationService,
|
||||
ng,
|
||||
cleanup,
|
||||
live,
|
||||
|
@ -1226,7 +1226,7 @@ func createOrg(t *testing.T, id int64) *org.Org {
|
||||
}
|
||||
|
||||
// teardown cleans the input tables between test cases.
|
||||
func teardown(t *testing.T, x *xorm.Engine, service *MigrationService) {
|
||||
func teardown(t *testing.T, x *xorm.Engine, service *migrationService) {
|
||||
_, err := x.Exec("DELETE from org")
|
||||
require.NoError(t, err)
|
||||
_, err = x.Exec("DELETE from alert")
|
||||
|
@ -41,7 +41,7 @@ type OrgMigration struct {
|
||||
}
|
||||
|
||||
// newOrgMigration creates a new OrgMigration for the given orgID.
|
||||
func (ms *MigrationService) newOrgMigration(orgID int64) *OrgMigration {
|
||||
func (ms *migrationService) newOrgMigration(orgID int64) *OrgMigration {
|
||||
return &OrgMigration{
|
||||
cfg: ms.cfg,
|
||||
log: ms.log.New("orgID", orgID),
|
||||
|
@ -19,7 +19,11 @@ const actionName = "alerting migration"
|
||||
//nolint:stylecheck
|
||||
var ForceMigrationError = fmt.Errorf("Grafana has already been migrated to Unified Alerting. Any alert rules created while using Unified Alerting will be deleted by rolling back. Set force_migration=true in your grafana.ini and restart Grafana to roll back and delete Unified Alerting configuration data.")
|
||||
|
||||
type MigrationService struct {
|
||||
type UpgradeService interface {
|
||||
Run(ctx context.Context) error
|
||||
}
|
||||
|
||||
type migrationService struct {
|
||||
lock *serverlock.ServerLockService
|
||||
cfg *setting.Cfg
|
||||
log log.Logger
|
||||
@ -35,8 +39,8 @@ func ProvideService(
|
||||
store db.DB,
|
||||
migrationStore migrationStore.Store,
|
||||
encryptionService secrets.Service,
|
||||
) (*MigrationService, error) {
|
||||
return &MigrationService{
|
||||
) (UpgradeService, error) {
|
||||
return &migrationService{
|
||||
lock: lock,
|
||||
log: log.New("ngalert.migration"),
|
||||
cfg: cfg,
|
||||
@ -49,9 +53,9 @@ func ProvideService(
|
||||
// Run starts the migration. This will either migrate from legacy alerting to unified alerting or revert the migration.
|
||||
// If the migration status in the kvstore is not set and unified alerting is enabled, the migration will be executed.
|
||||
// If the migration status in the kvstore is set and both unified alerting is disabled and ForceMigration is set to true, the migration will be reverted.
|
||||
func (ms *MigrationService) Run(ctx context.Context) error {
|
||||
func (ms *migrationService) Run(ctx context.Context) error {
|
||||
var errMigration error
|
||||
errLock := ms.lock.LockExecuteAndRelease(ctx, actionName, time.Minute*10, func(context.Context) {
|
||||
errLock := ms.lock.LockExecuteAndRelease(ctx, actionName, time.Minute*10, func(ctx context.Context) {
|
||||
ms.log.Info("Starting")
|
||||
errMigration = ms.store.InTransaction(ctx, func(ctx context.Context) error {
|
||||
migrated, err := ms.migrationStore.IsMigrated(ctx)
|
||||
@ -110,13 +114,8 @@ func (ms *MigrationService) Run(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsDisabled returns true if the cfg is nil.
|
||||
func (ms *MigrationService) IsDisabled() bool {
|
||||
return ms.cfg == nil
|
||||
}
|
||||
|
||||
// migrateAllOrgs executes the migration for all orgs.
|
||||
func (ms *MigrationService) migrateAllOrgs(ctx context.Context) error {
|
||||
func (ms *migrationService) migrateAllOrgs(ctx context.Context) error {
|
||||
orgs, err := ms.migrationStore.GetAllOrgs(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get orgs: %w", err)
|
||||
@ -130,7 +129,7 @@ func (ms *MigrationService) migrateAllOrgs(ctx context.Context) error {
|
||||
|
||||
err = om.migrationStore.SetOrgMigrationState(ctx, o.ID, om.state)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("set org migration state: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -1,10 +1,10 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log/logtest"
|
||||
"github.com/grafana/grafana/pkg/infra/serverlock"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
migrationStore "github.com/grafana/grafana/pkg/services/ngalert/migration/store"
|
||||
@ -13,17 +13,30 @@ import (
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
|
||||
func NewTestMigrationService(t *testing.T, sqlStore *sqlstore.SQLStore, cfg *setting.Cfg) *MigrationService {
|
||||
func NewTestMigrationService(t *testing.T, sqlStore *sqlstore.SQLStore, cfg *setting.Cfg) *migrationService {
|
||||
t.Helper()
|
||||
if cfg == nil {
|
||||
cfg = setting.NewCfg()
|
||||
}
|
||||
ms, err := ProvideService(
|
||||
serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()),
|
||||
cfg,
|
||||
sqlStore,
|
||||
migrationStore.NewTestMigrationStore(t, sqlStore, cfg),
|
||||
fake_secrets.NewFakeSecretsService(),
|
||||
)
|
||||
require.NoError(t, err)
|
||||
return ms
|
||||
return &migrationService{
|
||||
lock: serverlock.ProvideService(sqlStore, tracing.InitializeTracerForTest()),
|
||||
log: &logtest.Fake{},
|
||||
cfg: cfg,
|
||||
store: sqlStore,
|
||||
migrationStore: migrationStore.NewTestMigrationStore(t, sqlStore, cfg),
|
||||
encryptionService: fake_secrets.NewFakeSecretsService(),
|
||||
}
|
||||
}
|
||||
|
||||
func NewFakeMigrationService(t testing.TB) *fakeMigrationService {
|
||||
t.Helper()
|
||||
return &fakeMigrationService{}
|
||||
}
|
||||
|
||||
type fakeMigrationService struct {
|
||||
}
|
||||
|
||||
func (ms *fakeMigrationService) Run(_ context.Context) error {
|
||||
// Do nothing.
|
||||
return nil
|
||||
}
|
||||
|
@ -24,10 +24,12 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/folder"
|
||||
"github.com/grafana/grafana/pkg/services/guardian"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/api"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/image"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/migration"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/provisioning"
|
||||
@ -68,6 +70,10 @@ func ProvideService(
|
||||
pluginsStore pluginstore.Store,
|
||||
tracer tracing.Tracer,
|
||||
ruleStore *store.DBstore,
|
||||
upgradeService migration.UpgradeService,
|
||||
|
||||
// This is necessary to ensure the guardian provider is initialized before we run the migration.
|
||||
_ *guardian.Provider,
|
||||
) (*AlertNG, error) {
|
||||
ng := &AlertNG{
|
||||
Cfg: cfg,
|
||||
@ -94,9 +100,17 @@ func ProvideService(
|
||||
pluginsStore: pluginsStore,
|
||||
tracer: tracer,
|
||||
store: ruleStore,
|
||||
upgradeService: upgradeService,
|
||||
}
|
||||
|
||||
if ng.IsDisabled() {
|
||||
// Migration is called even if UA is disabled. If UA is disabled, this will do nothing except handle logic around
|
||||
// reverting the migration.
|
||||
err := ng.upgradeService.Run(context.Background())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !ng.shouldRun() {
|
||||
return ng, nil
|
||||
}
|
||||
|
||||
@ -142,6 +156,8 @@ type AlertNG struct {
|
||||
bus bus.Bus
|
||||
pluginsStore pluginstore.Store
|
||||
tracer tracing.Tracer
|
||||
|
||||
upgradeService migration.UpgradeService
|
||||
}
|
||||
|
||||
func (ng *AlertNG) init() error {
|
||||
@ -316,8 +332,16 @@ func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore api.RuleSt
|
||||
})
|
||||
}
|
||||
|
||||
// shouldRun determines if AlertNG should init or run anything more than just the migration.
|
||||
func (ng *AlertNG) shouldRun() bool {
|
||||
return ng.Cfg.UnifiedAlerting.IsEnabled()
|
||||
}
|
||||
|
||||
// Run starts the scheduler and Alertmanager.
|
||||
func (ng *AlertNG) Run(ctx context.Context) error {
|
||||
if !ng.shouldRun() {
|
||||
return nil
|
||||
}
|
||||
ng.Log.Debug("Starting")
|
||||
|
||||
ng.stateManager.Warm(ctx, ng.store)
|
||||
@ -341,10 +365,7 @@ func (ng *AlertNG) Run(ctx context.Context) error {
|
||||
|
||||
// IsDisabled returns true if the alerting service is disabled for this instance.
|
||||
func (ng *AlertNG) IsDisabled() bool {
|
||||
if ng.Cfg == nil {
|
||||
return true
|
||||
}
|
||||
return !ng.Cfg.UnifiedAlerting.IsEnabled()
|
||||
return ng.Cfg == nil
|
||||
}
|
||||
|
||||
// GetHooks returns a facility for replacing handlers for paths. The handler hook for a path
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/folder/folderimpl"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/migration"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/testutil"
|
||||
@ -68,7 +69,7 @@ func SetupTestEnv(tb testing.TB, baseInterval time.Duration) (*ngalert.AlertNG,
|
||||
ng, err := ngalert.ProvideService(
|
||||
cfg, featuremgmt.WithFeatures(), nil, nil, routing.NewRouteRegister(), sqlStore, nil, nil, nil, quotatest.New(false, nil),
|
||||
secretsService, nil, m, folderService, ac, &dashboards.FakeDashboardService{}, nil, bus, ac,
|
||||
annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore,
|
||||
annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore, migration.NewFakeMigrationService(tb), nil,
|
||||
)
|
||||
require.NoError(tb, err)
|
||||
return ng, &store.DBstore{
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/folder/foldertest"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/migration"
|
||||
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
ngstore "github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
"github.com/grafana/grafana/pkg/services/org"
|
||||
@ -485,7 +486,7 @@ func setupEnv(t *testing.T, sqlStore *sqlstore.SQLStore, b bus.Bus, quotaService
|
||||
_, err = ngalert.ProvideService(
|
||||
sqlStore.Cfg, featuremgmt.WithFeatures(), nil, nil, routing.NewRouteRegister(), sqlStore, nil, nil, nil, quotaService,
|
||||
secretsService, nil, m, &foldertest.FakeService{}, &acmock.Mock{}, &dashboards.FakeDashboardService{}, nil, b, &acmock.Mock{},
|
||||
annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore,
|
||||
annotationstest.NewFakeAnnotationsRepo(), &pluginstore.FakePluginStore{}, tracer, ruleStore, migration.NewFakeMigrationService(t), nil,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
_, err = storesrv.ProvideService(sqlStore, featuremgmt.WithFeatures(), sqlStore.Cfg, quotaService, storesrv.ProvideSystemUsersService())
|
||||
|
Loading…
Reference in New Issue
Block a user