From 85a954cd81ea7b8632f0781518a03b63a17e05b5 Mon Sep 17 00:00:00 2001 From: Yuri Tseretyan Date: Tue, 14 Mar 2023 18:02:51 -0400 Subject: [PATCH] Alerting: Update scheduler to get updates only from database (#64635) * stop using the scheduler's Update and Delete methods all communication must be via the database * update scheduler's registry to calculate diff before re-setting the cache * update fetcher to return the diff generated by registry * update processTick to update rule eval routine if the rule was updated and it is not going to be evaluated at this tick. * remove references to the scheduler from api package * remove unused methods in the scheduler --- pkg/services/ngalert/api/api.go | 3 - pkg/services/ngalert/api/api_ruler.go | 23 ---- pkg/services/ngalert/api/api_ruler_test.go | 98 ++++------------ pkg/services/ngalert/ngalert.go | 15 +-- pkg/services/ngalert/ngalert_test.go | 21 +--- pkg/services/ngalert/schedule/fetcher.go | 20 ++-- pkg/services/ngalert/schedule/registry.go | 50 +++++--- .../ngalert/schedule/registry_test.go | 50 ++++++++ pkg/services/ngalert/schedule/schedule.go | 55 +++++---- .../ngalert/schedule/schedule_mock.go | 56 --------- .../ngalert/schedule/schedule_unit_test.go | 109 ++++++++---------- 11 files changed, 202 insertions(+), 298 deletions(-) delete mode 100644 pkg/services/ngalert/schedule/schedule_mock.go diff --git a/pkg/services/ngalert/api/api.go b/pkg/services/ngalert/api/api.go index 1655416a28c..08aa14f2017 100644 --- a/pkg/services/ngalert/api/api.go +++ b/pkg/services/ngalert/api/api.go @@ -18,7 +18,6 @@ import ( "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/notifier" "github.com/grafana/grafana/pkg/services/ngalert/provisioning" - "github.com/grafana/grafana/pkg/services/ngalert/schedule" "github.com/grafana/grafana/pkg/services/ngalert/sender" "github.com/grafana/grafana/pkg/services/ngalert/state" "github.com/grafana/grafana/pkg/services/ngalert/store" @@ -66,7 +65,6 @@ type API struct { DatasourceService datasources.DataSourceService RouteRegister routing.RouteRegister QuotaService quota.Service - Schedule schedule.ScheduleService TransactionManager provisioning.TransactionManager ProvenanceStore provisioning.ProvisioningStore RuleStore RuleStore @@ -116,7 +114,6 @@ func (api *API) RegisterAPIEndpoints(m *metrics.API) { &RulerSrv{ conditionValidator: api.EvaluatorFactory, QuotaService: api.QuotaService, - scheduleService: api.Schedule, store: api.RuleStore, provenanceStore: api.ProvenanceStore, xactManager: api.TransactionManager, diff --git a/pkg/services/ngalert/api/api_ruler.go b/pkg/services/ngalert/api/api_ruler.go index 6f73cd98fce..fae3c566668 100644 --- a/pkg/services/ngalert/api/api_ruler.go +++ b/pkg/services/ngalert/api/api_ruler.go @@ -20,7 +20,6 @@ import ( "github.com/grafana/grafana/pkg/services/ngalert/eval" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/provisioning" - "github.com/grafana/grafana/pkg/services/ngalert/schedule" "github.com/grafana/grafana/pkg/services/ngalert/store" "github.com/grafana/grafana/pkg/services/quota" "github.com/grafana/grafana/pkg/setting" @@ -37,7 +36,6 @@ type RulerSrv struct { provenanceStore provisioning.ProvisioningStore store RuleStore QuotaService quota.Service - scheduleService schedule.ScheduleService log log.Logger cfg *setting.UnifiedAlertingSettings ac accesscontrol.AccessControl @@ -144,12 +142,6 @@ func (srv RulerSrv) RouteDeleteAlertRules(c *contextmodel.ReqContext, namespaceT } return ErrResp(http.StatusInternalServerError, err, "failed to delete rule group") } - - logger.Debug("rules have been deleted from the store. updating scheduler") - for _, ruleKeys := range deletedGroups { - srv.scheduleService.DeleteAlertRule(ruleKeys...) - } - return response.JSON(http.StatusAccepted, util.DynMap{"message": "rules deleted"}) } @@ -421,21 +413,6 @@ func (srv RulerSrv) updateAlertRulesInGroup(c *contextmodel.ReqContext, groupKey return ErrResp(http.StatusInternalServerError, err, "failed to update rule group") } - for _, rule := range finalChanges.Update { - srv.scheduleService.UpdateAlertRule(ngmodels.AlertRuleKey{ - OrgID: c.SignedInUser.OrgID, - UID: rule.Existing.UID, - }, rule.Existing.Version+1, rule.New.IsPaused) - } - - if len(finalChanges.Delete) > 0 { - keys := make([]ngmodels.AlertRuleKey, 0, len(finalChanges.Delete)) - for _, rule := range finalChanges.Delete { - keys = append(keys, rule.GetKey()) - } - srv.scheduleService.DeleteAlertRule(keys...) - } - if finalChanges.IsEmpty() { return response.JSON(http.StatusAccepted, util.DynMap{"message": "no changes detected in the rule group"}) } diff --git a/pkg/services/ngalert/api/api_ruler_test.go b/pkg/services/ngalert/api/api_ruler_test.go index bace6d77e34..76bbf31e160 100644 --- a/pkg/services/ngalert/api/api_ruler_test.go +++ b/pkg/services/ngalert/api/api_ruler_test.go @@ -22,7 +22,6 @@ import ( apimodels "github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions" "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/provisioning" - "github.com/grafana/grafana/pkg/services/ngalert/schedule" "github.com/grafana/grafana/pkg/services/ngalert/store" "github.com/grafana/grafana/pkg/services/ngalert/tests/fakes" "github.com/grafana/grafana/pkg/services/org" @@ -47,7 +46,7 @@ func TestRouteDeleteAlertRules(t *testing.T) { return result } - assertRulesDeleted := func(t *testing.T, expectedRules []*models.AlertRule, ruleStore *fakes.RuleStore, scheduler *schedule.FakeScheduleService) { + assertRulesDeleted := func(t *testing.T, expectedRules []*models.AlertRule, ruleStore *fakes.RuleStore) { deleteCommands := getRecordedCommand(ruleStore) require.Len(t, deleteCommands, 1) cmd := deleteCommands[0] @@ -56,20 +55,6 @@ func TestRouteDeleteAlertRules(t *testing.T) { for _, rule := range expectedRules { require.Containsf(t, actualUIDs, rule.UID, "Rule %s was expected to be deleted but it wasn't", rule.UID) } - - notDeletedRules := make(map[models.AlertRuleKey]struct{}, len(expectedRules)) - for _, rule := range expectedRules { - notDeletedRules[rule.GetKey()] = struct{}{} - } - for _, call := range scheduler.Calls { - require.Equal(t, "DeleteAlertRule", call.Method) - keys, ok := call.Arguments.Get(0).([]models.AlertRuleKey) - require.Truef(t, ok, "Expected AlertRuleKey but got something else") - for _, key := range keys { - delete(notDeletedRules, key) - } - } - require.Emptyf(t, notDeletedRules, "Not all rules were deleted") } orgID := rand.Int63() @@ -89,14 +74,10 @@ func TestRouteDeleteAlertRules(t *testing.T) { ruleStore := initFakeRuleStore(t) ruleStore.PutRule(context.Background(), models.GenerateAlertRulesSmallNonEmpty(models.AlertRuleGen(withOrgID(orgID), withNamespace(folder)))...) - scheduler := &schedule.FakeScheduleService{} - scheduler.On("DeleteAlertRule", mock.Anything) - request := createRequestContext(orgID, org.RoleViewer, nil) - response := createService(ac, ruleStore, scheduler).RouteDeleteAlertRules(request, folder.Title, "") + response := createService(ac, ruleStore).RouteDeleteAlertRules(request, folder.Title, "") require.Equalf(t, 401, response.Status(), "Expected 401 but got %d: %v", response.Status(), string(response.Body())) - scheduler.AssertNotCalled(t, "DeleteAlertRule") require.Empty(t, getRecordedCommand(ruleStore)) }) t.Run("editor should be able to delete all non-provisioned rules in folder", func(t *testing.T) { @@ -104,14 +85,10 @@ func TestRouteDeleteAlertRules(t *testing.T) { rulesInFolder := models.GenerateAlertRulesSmallNonEmpty(models.AlertRuleGen(withOrgID(orgID), withNamespace(folder))) ruleStore.PutRule(context.Background(), rulesInFolder...) - scheduler := &schedule.FakeScheduleService{} - scheduler.On("DeleteAlertRule", mock.Anything) - request := createRequestContext(orgID, org.RoleEditor, nil) - response := createService(ac, ruleStore, scheduler).RouteDeleteAlertRules(request, folder.Title, "") + response := createService(ac, ruleStore).RouteDeleteAlertRules(request, folder.Title, "") require.Equalf(t, 202, response.Status(), "Expected 202 but got %d: %v", response.Status(), string(response.Body())) - assertRulesDeleted(t, rulesInFolder, ruleStore, scheduler) }) t.Run("editor should be able to delete rules group if it is not provisioned", func(t *testing.T) { groupName := util.GenerateShortUID() @@ -124,26 +101,19 @@ func TestRouteDeleteAlertRules(t *testing.T) { // rules in the same group but different folder ruleStore.PutRule(context.Background(), models.GenerateAlertRulesSmallNonEmpty(models.AlertRuleGen(withOrgID(orgID), withGroup(groupName)))...) - scheduler := &schedule.FakeScheduleService{} - scheduler.On("DeleteAlertRule", mock.Anything).Return() - request := createRequestContext(orgID, org.RoleEditor, nil) - response := createService(ac, ruleStore, scheduler).RouteDeleteAlertRules(request, folder.Title, groupName) + response := createService(ac, ruleStore).RouteDeleteAlertRules(request, folder.Title, groupName) require.Equalf(t, 202, response.Status(), "Expected 202 but got %d: %v", response.Status(), string(response.Body())) - assertRulesDeleted(t, rulesInFolderInGroup, ruleStore, scheduler) + assertRulesDeleted(t, rulesInFolderInGroup, ruleStore) }) t.Run("should return 202 if folder is empty", func(t *testing.T) { ruleStore := initFakeRuleStore(t) - scheduler := &schedule.FakeScheduleService{} - scheduler.On("DeleteAlertRule", mock.Anything) - requestCtx := createRequestContext(orgID, org.RoleEditor, nil) - response := createService(ac, ruleStore, scheduler).RouteDeleteAlertRules(requestCtx, folder.Title, "") + response := createService(ac, ruleStore).RouteDeleteAlertRules(requestCtx, folder.Title, "") require.Equalf(t, 202, response.Status(), "Expected 202 but got %d: %v", response.Status(), string(response.Body())) - scheduler.AssertNotCalled(t, "DeleteAlertRule") require.Empty(t, getRecordedCommand(ruleStore)) }) }) @@ -155,25 +125,18 @@ func TestRouteDeleteAlertRules(t *testing.T) { ruleStore := initFakeRuleStore(t) ruleStore.PutRule(context.Background(), models.GenerateAlertRulesSmallNonEmpty(models.AlertRuleGen(withOrgID(orgID), withNamespace(folder)))...) - scheduler := &schedule.FakeScheduleService{} - scheduler.On("DeleteAlertRule", mock.Anything).Panic("should not be called") - ac := acMock.New() request := createRequestContext(orgID, "None", nil) - response := createService(ac, ruleStore, scheduler).RouteDeleteAlertRules(request, folder.Title, "") + response := createService(ac, ruleStore).RouteDeleteAlertRules(request, folder.Title, "") require.Equalf(t, 401, response.Status(), "Expected 401 but got %d: %v", response.Status(), string(response.Body())) - scheduler.AssertNotCalled(t, "DeleteAlertRule") require.Empty(t, getRecordedCommand(ruleStore)) }) t.Run("delete only non-provisioned groups that user is authorized", func(t *testing.T) { ruleStore := initFakeRuleStore(t) provisioningStore := provisioning.NewFakeProvisioningStore() - scheduler := &schedule.FakeScheduleService{} - scheduler.On("DeleteAlertRule", mock.Anything) - authorizedRulesInFolder := models.GenerateAlertRulesSmallNonEmpty(models.AlertRuleGen(withOrgID(orgID), withNamespace(folder), withGroup("authz_"+util.GenerateShortUID()))) provisionedRulesInFolder := models.GenerateAlertRulesSmallNonEmpty(models.AlertRuleGen(withOrgID(orgID), withNamespace(folder), withGroup("provisioned_"+util.GenerateShortUID()))) @@ -187,10 +150,10 @@ func TestRouteDeleteAlertRules(t *testing.T) { ac := acMock.New().WithPermissions(createPermissionsForRules(append(authorizedRulesInFolder, provisionedRulesInFolder...))) - response := createServiceWithProvenanceStore(ac, ruleStore, scheduler, provisioningStore).RouteDeleteAlertRules(requestCtx, folder.Title, "") + response := createServiceWithProvenanceStore(ac, ruleStore, provisioningStore).RouteDeleteAlertRules(requestCtx, folder.Title, "") require.Equalf(t, 202, response.Status(), "Expected 202 but got %d: %v", response.Status(), string(response.Body())) - assertRulesDeleted(t, authorizedRulesInFolder, ruleStore, scheduler) + assertRulesDeleted(t, authorizedRulesInFolder, ruleStore) }) t.Run("return 400 if all rules user can access are provisioned", func(t *testing.T) { ruleStore := initFakeRuleStore(t) @@ -204,15 +167,11 @@ func TestRouteDeleteAlertRules(t *testing.T) { // more rules in the same namespace but user does not have access to them ruleStore.PutRule(context.Background(), models.GenerateAlertRulesSmallNonEmpty(models.AlertRuleGen(withOrgID(orgID), withNamespace(folder), withGroup(util.GenerateShortUID())))...) - scheduler := &schedule.FakeScheduleService{} - scheduler.On("DeleteAlertRule", mock.Anything) - ac := acMock.New().WithPermissions(createPermissionsForRules(provisionedRulesInFolder)) - response := createServiceWithProvenanceStore(ac, ruleStore, scheduler, provisioningStore).RouteDeleteAlertRules(requestCtx, folder.Title, "") + response := createServiceWithProvenanceStore(ac, ruleStore, provisioningStore).RouteDeleteAlertRules(requestCtx, folder.Title, "") require.Equalf(t, 400, response.Status(), "Expected 400 but got %d: %v", response.Status(), string(response.Body())) - scheduler.AssertNotCalled(t, "DeleteAlertRule") require.Empty(t, getRecordedCommand(ruleStore)) }) }) @@ -226,15 +185,11 @@ func TestRouteDeleteAlertRules(t *testing.T) { // more rules in the same group but user is not authorized to access them ruleStore.PutRule(context.Background(), models.GenerateAlertRulesSmallNonEmpty(models.AlertRuleGen(withOrgID(orgID), withNamespace(folder), withGroup(groupName)))...) - scheduler := &schedule.FakeScheduleService{} - scheduler.On("DeleteAlertRule", mock.Anything) - ac := acMock.New().WithPermissions(createPermissionsForRules(authorizedRulesInGroup)) - response := createService(ac, ruleStore, scheduler).RouteDeleteAlertRules(requestCtx, folder.Title, groupName) + response := createService(ac, ruleStore).RouteDeleteAlertRules(requestCtx, folder.Title, groupName) require.Equalf(t, 401, response.Status(), "Expected 401 but got %d: %v", response.Status(), string(response.Body())) - scheduler.AssertNotCalled(t, "DeleteAlertRule", mock.Anything) deleteCommands := getRecordedCommand(ruleStore) require.Empty(t, deleteCommands) }) @@ -248,15 +203,11 @@ func TestRouteDeleteAlertRules(t *testing.T) { ruleStore.PutRule(context.Background(), provisionedRulesInFolder...) - scheduler := &schedule.FakeScheduleService{} - scheduler.On("DeleteAlertRule", mock.Anything) - ac := acMock.New().WithPermissions(createPermissionsForRules(provisionedRulesInFolder)) - response := createServiceWithProvenanceStore(ac, ruleStore, scheduler, provisioningStore).RouteDeleteAlertRules(requestCtx, folder.Title, groupName) + response := createServiceWithProvenanceStore(ac, ruleStore, provisioningStore).RouteDeleteAlertRules(requestCtx, folder.Title, groupName) require.Equalf(t, 400, response.Status(), "Expected 400 but got %d: %v", response.Status(), string(response.Body())) - scheduler.AssertNotCalled(t, "DeleteAlertRule", mock.Anything) deleteCommands := getRecordedCommand(ruleStore) require.Empty(t, deleteCommands) }) @@ -277,7 +228,7 @@ func TestRouteGetNamespaceRulesConfig(t *testing.T) { ac := acMock.New().WithPermissions(createPermissionsForRules(expectedRules)) req := createRequestContext(orgID, "", nil) - response := createService(ac, ruleStore, nil).RouteGetNamespaceRulesConfig(req, folder.Title) + response := createService(ac, ruleStore).RouteGetNamespaceRulesConfig(req, folder.Title) require.Equal(t, http.StatusAccepted, response.Status()) result := &apimodels.NamespaceConfigResponse{} @@ -312,7 +263,7 @@ func TestRouteGetNamespaceRulesConfig(t *testing.T) { ac := acMock.New().WithDisabled() req := createRequestContext(orgID, org.RoleViewer, nil) - response := createService(ac, ruleStore, nil).RouteGetNamespaceRulesConfig(req, folder.Title) + response := createService(ac, ruleStore).RouteGetNamespaceRulesConfig(req, folder.Title) require.Equal(t, http.StatusAccepted, response.Status()) result := &apimodels.NamespaceConfigResponse{} @@ -345,7 +296,7 @@ func TestRouteGetNamespaceRulesConfig(t *testing.T) { ruleStore.PutRule(context.Background(), expectedRules...) ac := acMock.New().WithDisabled() - svc := createService(ac, ruleStore, nil) + svc := createService(ac, ruleStore) // add provenance to the first generated rule rule := &models.AlertRule{ @@ -389,7 +340,7 @@ func TestRouteGetNamespaceRulesConfig(t *testing.T) { ruleStore.PutRule(context.Background(), expectedRules...) ac := acMock.New().WithDisabled() - response := createService(ac, ruleStore, nil).RouteGetNamespaceRulesConfig(createRequestContext(orgID, org.RoleViewer, nil), folder.Title) + response := createService(ac, ruleStore).RouteGetNamespaceRulesConfig(createRequestContext(orgID, org.RoleViewer, nil), folder.Title) require.Equal(t, http.StatusAccepted, response.Status()) result := &apimodels.NamespaceConfigResponse{} @@ -441,7 +392,7 @@ func TestRouteGetRulesConfig(t *testing.T) { request := createRequestContext(orgID, "", nil) t.Run("and do not return group if user does not have access to one of rules", func(t *testing.T) { ac := acMock.New().WithPermissions(createPermissionsForRules(append(group1, group2[1:]...))) - response := createService(ac, ruleStore, nil).RouteGetRulesConfig(request) + response := createService(ac, ruleStore).RouteGetRulesConfig(request) require.Equal(t, http.StatusOK, response.Status()) result := &apimodels.NamespaceConfigResponse{} @@ -471,7 +422,7 @@ func TestRouteGetRulesConfig(t *testing.T) { ruleStore.PutRule(context.Background(), expectedRules...) ac := acMock.New().WithDisabled() - response := createService(ac, ruleStore, nil).RouteGetRulesConfig(createRequestContext(orgID, org.RoleViewer, nil)) + response := createService(ac, ruleStore).RouteGetRulesConfig(createRequestContext(orgID, org.RoleViewer, nil)) require.Equal(t, http.StatusOK, response.Status()) result := &apimodels.NamespaceConfigResponse{} @@ -522,13 +473,13 @@ func TestRouteGetRulesGroupConfig(t *testing.T) { t.Run("and return 401 if user does not have access one of rules", func(t *testing.T) { ac := acMock.New().WithPermissions(createPermissionsForRules(expectedRules[1:])) - response := createService(ac, ruleStore, nil).RouteGetRulesGroupConfig(request, folder.Title, groupKey.RuleGroup) + response := createService(ac, ruleStore).RouteGetRulesGroupConfig(request, folder.Title, groupKey.RuleGroup) require.Equal(t, http.StatusUnauthorized, response.Status()) }) t.Run("and return rules if user has access to all of them", func(t *testing.T) { ac := acMock.New().WithPermissions(createPermissionsForRules(expectedRules)) - response := createService(ac, ruleStore, nil).RouteGetRulesGroupConfig(request, folder.Title, groupKey.RuleGroup) + response := createService(ac, ruleStore).RouteGetRulesGroupConfig(request, folder.Title, groupKey.RuleGroup) require.Equal(t, http.StatusAccepted, response.Status()) result := &apimodels.RuleGroupConfigResponse{} @@ -551,7 +502,7 @@ func TestRouteGetRulesGroupConfig(t *testing.T) { ruleStore.PutRule(context.Background(), expectedRules...) ac := acMock.New().WithDisabled() - response := createService(ac, ruleStore, nil).RouteGetRulesGroupConfig(createRequestContext(orgID, org.RoleViewer, nil), folder.Title, groupKey.RuleGroup) + response := createService(ac, ruleStore).RouteGetRulesGroupConfig(createRequestContext(orgID, org.RoleViewer, nil), folder.Title, groupKey.RuleGroup) require.Equal(t, http.StatusAccepted, response.Status()) result := &apimodels.RuleGroupConfigResponse{} @@ -638,19 +589,18 @@ func TestVerifyProvisionedRulesNotAffected(t *testing.T) { }) } -func createServiceWithProvenanceStore(ac *acMock.Mock, store *fakes.RuleStore, scheduler schedule.ScheduleService, provenanceStore provisioning.ProvisioningStore) *RulerSrv { - svc := createService(ac, store, scheduler) +func createServiceWithProvenanceStore(ac *acMock.Mock, store *fakes.RuleStore, provenanceStore provisioning.ProvisioningStore) *RulerSrv { + svc := createService(ac, store) svc.provenanceStore = provenanceStore return svc } -func createService(ac *acMock.Mock, store *fakes.RuleStore, scheduler schedule.ScheduleService) *RulerSrv { +func createService(ac *acMock.Mock, store *fakes.RuleStore) *RulerSrv { return &RulerSrv{ xactManager: store, store: store, QuotaService: nil, provenanceStore: provisioning.NewFakeProvisioningStore(), - scheduleService: scheduler, log: log.New("test"), cfg: nil, ac: ac, diff --git a/pkg/services/ngalert/ngalert.go b/pkg/services/ngalert/ngalert.go index dca37115d89..eba15581956 100644 --- a/pkg/services/ngalert/ngalert.go +++ b/pkg/services/ngalert/ngalert.go @@ -228,7 +228,7 @@ func (ng *AlertNG) init() error { // if it is required to include folder title to the alerts, we need to subscribe to changes of alert title if !ng.Cfg.UnifiedAlerting.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel) { - subscribeToFolderChanges(context.Background(), ng.Log, ng.bus, store, scheduler) + subscribeToFolderChanges(ng.Log, ng.bus, store) } ng.stateManager = stateManager @@ -248,7 +248,6 @@ func (ng *AlertNG) init() error { DatasourceCache: ng.DataSourceCache, DatasourceService: ng.DataSourceService, RouteRegister: ng.RouteRegister, - Schedule: ng.schedule, DataProxy: ng.DataProxy, QuotaService: ng.QuotaService, TransactionManager: store, @@ -296,26 +295,18 @@ func (ng *AlertNG) init() error { return DeclareFixedRoles(ng.accesscontrolService) } -func subscribeToFolderChanges(ctx context.Context, logger log.Logger, bus bus.Bus, dbStore api.RuleStore, scheduler schedule.ScheduleService) { +func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore api.RuleStore) { // if folder title is changed, we update all alert rules in that folder to make sure that all peers (in HA mode) will update folder title and // clean up the current state bus.AddEventListener(func(ctx context.Context, e *events.FolderTitleUpdated) error { // do not block the upstream execution go func(evt *events.FolderTitleUpdated) { logger.Info("Got folder title updated event. updating rules in the folder", "folderUID", evt.UID) - updated, err := dbStore.IncreaseVersionForAllRulesInNamespace(ctx, evt.OrgID, evt.UID) + _, err := dbStore.IncreaseVersionForAllRulesInNamespace(ctx, evt.OrgID, evt.UID) if err != nil { logger.Error("Failed to update alert rules in the folder after its title was changed", "error", err, "folderUID", evt.UID, "folder", evt.Title) return } - if len(updated) > 0 { - logger.Info("Rules that belong to the folder have been updated successfully. Clearing their status", "folderUID", evt.UID, "updatedRules", len(updated)) - for _, key := range updated { - scheduler.UpdateAlertRule(key.AlertRuleKey, key.Version, key.IsPaused) - } - } else { - logger.Debug("No alert rules found in the folder. nothing to update", "folderUID", evt.UID, "folder", evt.Title) - } }(e) return nil }) diff --git a/pkg/services/ngalert/ngalert_test.go b/pkg/services/ngalert/ngalert_test.go index 547981418e8..9b75e022d7e 100644 --- a/pkg/services/ngalert/ngalert_test.go +++ b/pkg/services/ngalert/ngalert_test.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/grafana/grafana/pkg/bus" @@ -19,7 +18,6 @@ import ( "github.com/grafana/grafana/pkg/services/folder" "github.com/grafana/grafana/pkg/services/ngalert/metrics" "github.com/grafana/grafana/pkg/services/ngalert/models" - "github.com/grafana/grafana/pkg/services/ngalert/schedule" "github.com/grafana/grafana/pkg/services/ngalert/tests/fakes" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/util" @@ -39,10 +37,7 @@ func Test_subscribeToFolderChanges(t *testing.T) { db.Folders[orgID] = append(db.Folders[orgID], folder) db.PutRule(context.Background(), rules...) - scheduler := &schedule.FakeScheduleService{} - scheduler.On("UpdateAlertRule", mock.Anything, mock.Anything, mock.Anything).Return() - - subscribeToFolderChanges(context.Background(), log.New("test"), bus, db, scheduler) + subscribeToFolderChanges(log.New("test"), bus, db) err := bus.Publish(context.Background(), &events.FolderTitleUpdated{ Timestamp: time.Now(), @@ -62,20 +57,6 @@ func Test_subscribeToFolderChanges(t *testing.T) { return c, true })) > 0 }, time.Second, 10*time.Millisecond, "expected to call db store method but nothing was called") - - var calledTimes int - require.Eventuallyf(t, func() bool { - for _, call := range scheduler.Calls { - if call.Method == "UpdateAlertRule" { - calledTimes++ - } - } - return calledTimes == len(rules) - }, time.Second, 10*time.Millisecond, "scheduler was expected to be called %d times but called %d", len(rules), calledTimes) - - for _, rule := range rules { - scheduler.AssertCalled(t, "UpdateAlertRule", rule.GetKey(), rule.Version, false) - } } func TestConfigureHistorianBackend(t *testing.T) { diff --git a/pkg/services/ngalert/schedule/fetcher.go b/pkg/services/ngalert/schedule/fetcher.go index 81655a2e69d..1db61e9ad9a 100644 --- a/pkg/services/ngalert/schedule/fetcher.go +++ b/pkg/services/ngalert/schedule/fetcher.go @@ -34,9 +34,9 @@ func sortedUIDs(alertRules []*models.AlertRule) []string { } // updateSchedulableAlertRules updates the alert rules for the scheduler. -// It returns an error if the database is unavailable or the query returned -// an error. -func (sch *schedule) updateSchedulableAlertRules(ctx context.Context) error { +// It returns diff that contains rule keys that were updated since the last poll, +// and an error if the database query encountered problems. +func (sch *schedule) updateSchedulableAlertRules(ctx context.Context) (diff, error) { start := time.Now() defer func() { sch.metrics.UpdateSchedulableAlertRulesDuration.Observe( @@ -46,21 +46,21 @@ func (sch *schedule) updateSchedulableAlertRules(ctx context.Context) error { if !sch.schedulableAlertRules.isEmpty() { keys, err := sch.ruleStore.GetAlertRulesKeysForScheduling(ctx) if err != nil { - return err + return diff{}, err } if !sch.schedulableAlertRules.needsUpdate(keys) { sch.log.Debug("No changes detected. Skip updating") - return nil + return diff{}, nil } } - + // At this point, we know we need to re-fetch rules as there are changes. q := models.GetAlertRulesForSchedulingQuery{ PopulateFolders: !sch.disableGrafanaFolder, } if err := sch.ruleStore.GetAlertRulesForScheduling(ctx, &q); err != nil { - return fmt.Errorf("failed to get alert rules: %w", err) + return diff{}, fmt.Errorf("failed to get alert rules: %w", err) } - sch.log.Debug("Alert rules fetched", "rulesCount", len(q.ResultRules), "foldersCount", len(q.ResultFoldersTitles)) - sch.schedulableAlertRules.set(q.ResultRules, q.ResultFoldersTitles) - return nil + d := sch.schedulableAlertRules.set(q.ResultRules, q.ResultFoldersTitles) + sch.log.Debug("Alert rules fetched", "rulesCount", len(q.ResultRules), "foldersCount", len(q.ResultFoldersTitles), "updatedRules", len(d.updated)) + return d, nil } diff --git a/pkg/services/ngalert/schedule/registry.go b/pkg/services/ngalert/schedule/registry.go index d162e3d3957..2aea4c5ceec 100644 --- a/pkg/services/ngalert/schedule/registry.go +++ b/pkg/services/ngalert/schedule/registry.go @@ -3,7 +3,6 @@ package schedule import ( "context" "errors" - "fmt" "sync" "time" @@ -32,19 +31,6 @@ func (r *alertRuleInfoRegistry) getOrCreateInfo(context context.Context, key mod return info, !ok } -// get returns the channel for the specific alert rule -// if the key does not exist returns an error -func (r *alertRuleInfoRegistry) get(key models.AlertRuleKey) (*alertRuleInfo, error) { - r.mu.Lock() - defer r.mu.Unlock() - - info, ok := r.alertRuleInfo[key] - if !ok { - return nil, fmt.Errorf("%v key not found", key) - } - return info, nil -} - func (r *alertRuleInfoRegistry) exists(key models.AlertRuleKey) bool { r.mu.Lock() defer r.mu.Unlock() @@ -169,16 +155,19 @@ func (r *alertRulesRegistry) get(k models.AlertRuleKey) *models.AlertRule { return r.rules[k] } -// set replaces all rules in the registry. -func (r *alertRulesRegistry) set(rules []*models.AlertRule, folders map[string]string) { +// set replaces all rules in the registry. Returns difference between previous and the new current version of the registry +func (r *alertRulesRegistry) set(rules []*models.AlertRule, folders map[string]string) diff { r.mu.Lock() defer r.mu.Unlock() - r.rules = make(map[models.AlertRuleKey]*models.AlertRule) + rulesMap := make(map[models.AlertRuleKey]*models.AlertRule) for _, rule := range rules { - r.rules[rule.GetKey()] = rule + rulesMap[rule.GetKey()] = rule } + d := r.getDiff(rulesMap) + r.rules = rulesMap // return the map as is without copying because it is not mutated r.folderTitles = folders + return d } // update inserts or replaces a rule in the registry. @@ -219,3 +208,28 @@ func (r *alertRulesRegistry) needsUpdate(keys []models.AlertRuleKeyWithVersion) } return false } + +type diff struct { + updated map[models.AlertRuleKey]struct{} +} + +func (d diff) IsEmpty() bool { + return len(d.updated) == 0 +} + +// getDiff calculates difference between the list of rules fetched previously and provided keys. Returns diff where +// updated - a list of keys that exist in the registry but with different version, +func (r *alertRulesRegistry) getDiff(rules map[models.AlertRuleKey]*models.AlertRule) diff { + result := diff{ + updated: map[models.AlertRuleKey]struct{}{}, + } + for key, newRule := range rules { + oldRule, ok := r.rules[key] + if !ok || newRule.Version == oldRule.Version { + // a new rule or not updated + continue + } + result.updated[key] = struct{}{} + } + return result +} diff --git a/pkg/services/ngalert/schedule/registry_test.go b/pkg/services/ngalert/schedule/registry_test.go index 896962af74c..04a569b91d4 100644 --- a/pkg/services/ngalert/schedule/registry_test.go +++ b/pkg/services/ngalert/schedule/registry_test.go @@ -318,3 +318,53 @@ func TestSchedulableAlertRulesRegistry(t *testing.T) { assert.False(t, ok) assert.Nil(t, deleted) } + +func TestSchedulableAlertRulesRegistry_set(t *testing.T) { + _, initialRules := models.GenerateUniqueAlertRules(100, models.AlertRuleGen()) + init := make(map[models.AlertRuleKey]*models.AlertRule, len(initialRules)) + for _, rule := range initialRules { + init[rule.GetKey()] = rule + } + r := alertRulesRegistry{rules: init} + t.Run("should return empty diff if exactly the same rules", func(t *testing.T) { + newRules := make([]*models.AlertRule, 0, len(initialRules)) + for _, rule := range initialRules { + newRules = append(newRules, models.CopyRule(rule)) + } + diff := r.set(newRules, map[string]string{}) + require.Truef(t, diff.IsEmpty(), "Diff is not empty. Probably we check something else than key + version") + }) + t.Run("should return empty diff if version does not change", func(t *testing.T) { + newRules := make([]*models.AlertRule, 0, len(initialRules)) + // generate random and then override rule key + version + _, randomNew := models.GenerateUniqueAlertRules(len(initialRules), models.AlertRuleGen()) + for i := 0; i < len(initialRules); i++ { + rule := randomNew[i] + oldRule := initialRules[i] + rule.UID = oldRule.UID + rule.OrgID = oldRule.OrgID + rule.Version = oldRule.Version + newRules = append(newRules, rule) + } + + diff := r.set(newRules, map[string]string{}) + require.Truef(t, diff.IsEmpty(), "Diff is not empty. Probably we check something else than key + version") + }) + t.Run("should return key in diff if version changes", func(t *testing.T) { + newRules := make([]*models.AlertRule, 0, len(initialRules)) + expectedUpdated := map[models.AlertRuleKey]struct{}{} + for i, rule := range initialRules { + cp := models.CopyRule(rule) + if i%2 == 0 { + cp.Version++ + expectedUpdated[cp.GetKey()] = struct{}{} + } + newRules = append(newRules, cp) + } + require.NotEmptyf(t, expectedUpdated, "Input parameters have changed. Nothing to assert") + + diff := r.set(newRules, map[string]string{}) + require.Falsef(t, diff.IsEmpty(), "Diff is empty but should not be") + require.Equal(t, expectedUpdated, diff.updated) + }) +} diff --git a/pkg/services/ngalert/schedule/schedule.go b/pkg/services/ngalert/schedule/schedule.go index fc2a8ad2dca..ac81e451301 100644 --- a/pkg/services/ngalert/schedule/schedule.go +++ b/pkg/services/ngalert/schedule/schedule.go @@ -29,16 +29,10 @@ import ( // ScheduleService is an interface for a service that schedules the evaluation // of alert rules. -// -//go:generate mockery --name ScheduleService --structname FakeScheduleService --inpackage --filename schedule_mock.go --unroll-variadic=False type ScheduleService interface { // Run the scheduler until the context is canceled or the scheduler returns // an error. The scheduler is terminated when this function returns. Run(context.Context) error - // UpdateAlertRule notifies scheduler that a rule has been changed - UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64, isPaused bool) - // DeleteAlertRule notifies scheduler that rules have been deleted - DeleteAlertRule(keys ...ngmodels.AlertRuleKey) } // AlertsSender is an interface for a service that is responsible for sending notifications to the end-user. @@ -148,17 +142,8 @@ func (sch *schedule) Run(ctx context.Context) error { return nil } -// UpdateAlertRule looks for the active rule evaluation and commands it to update the rule -func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64, isPaused bool) { - ruleInfo, err := sch.registry.get(key) - if err != nil { - return - } - ruleInfo.update(ruleVersionAndPauseStatus{ruleVersion(lastVersion), isPaused}) -} - -// DeleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache. -func (sch *schedule) DeleteAlertRule(keys ...ngmodels.AlertRuleKey) { +// deleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache. +func (sch *schedule) deleteAlertRule(keys ...ngmodels.AlertRuleKey) { for _, key := range keys { // It can happen that the scheduler has deleted the alert rule before the // Ruler API has called DeleteAlertRule. This can happen as requests to @@ -230,12 +215,22 @@ func (sch *schedule) updateRulesMetrics(alertRules []*ngmodels.AlertRule) { sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules))) } -func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.Group, tick time.Time) ([]readyToRunItem, map[ngmodels.AlertRuleKey]struct{}) { +// TODO refactor to accept a callback for tests that will be called with things that are returned currently, and return nothing. +// Returns a slice of rules that were scheduled for evaluation, map of stopped rules, and a slice of updated rules +func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.Group, tick time.Time) ([]readyToRunItem, map[ngmodels.AlertRuleKey]struct{}, []ngmodels.AlertRuleKeyWithVersion) { tickNum := tick.Unix() / int64(sch.baseInterval.Seconds()) - if err := sch.updateSchedulableAlertRules(ctx); err != nil { + // update the local registry. If there was a difference between the previous state and the current new state, rulesDiff will contains keys of rules that were updated. + rulesDiff, err := sch.updateSchedulableAlertRules(ctx) + updated := rulesDiff.updated + if updated == nil { // make sure map is not nil + updated = map[ngmodels.AlertRuleKey]struct{}{} + } + if err != nil { sch.log.Error("Failed to update alert rules", "error", err) } + + // this is the new current state. rulesDiff contains the previously existing rules that were different between this state and the previous state. alertRules, folderTitles := sch.schedulableAlertRules.all() // registeredDefinitions is a map used for finding deleted alert rules @@ -247,6 +242,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. sch.updateRulesMetrics(alertRules) readyToRun := make([]readyToRunItem, 0) + updatedRules := make([]ngmodels.AlertRuleKeyWithVersion, 0, len(updated)) // this is needed for tests only missingFolder := make(map[string][]string) for _, item := range alertRules { key := item.GetKey() @@ -274,7 +270,8 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. } itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds()) - if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { + isReadyToRun := item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 + if isReadyToRun { var folderTitle string if !sch.disableGrafanaFolder { title, ok := folderTitles[item.NamespaceUID] @@ -290,6 +287,20 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. folderTitle: folderTitle, }}) } + if _, isUpdated := updated[key]; isUpdated && !isReadyToRun { + // if we do not need to eval the rule, check the whether rule was just updated and if it was, notify evaluation routine about that + sch.log.Debug("Rule has been updated. Notifying evaluation routine", key.LogContext()...) + go func(ri *alertRuleInfo, rule *ngmodels.AlertRule) { + ri.update(ruleVersionAndPauseStatus{ + Version: ruleVersion(rule.Version), + IsPaused: rule.IsPaused, + }) + }(ruleInfo, item) + updatedRules = append(updatedRules, ngmodels.AlertRuleKeyWithVersion{ + Version: item.Version, + AlertRuleKey: item.GetKey(), + }) + } // remove the alert rule from the registered alert rules delete(registeredDefinitions, key) @@ -327,8 +338,8 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup. for key := range registeredDefinitions { toDelete = append(toDelete, key) } - sch.DeleteAlertRule(toDelete...) - return readyToRun, registeredDefinitions + sch.deleteAlertRule(toDelete...) + return readyToRun, registeredDefinitions, updatedRules } func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertRuleKey, evalCh <-chan *evaluation, updateCh <-chan ruleVersionAndPauseStatus) error { diff --git a/pkg/services/ngalert/schedule/schedule_mock.go b/pkg/services/ngalert/schedule/schedule_mock.go deleted file mode 100644 index d9b1a9ac723..00000000000 --- a/pkg/services/ngalert/schedule/schedule_mock.go +++ /dev/null @@ -1,56 +0,0 @@ -// Code generated by mockery v2.10.0. DO NOT EDIT. - -package schedule - -import ( - context "context" - time "time" - - mock "github.com/stretchr/testify/mock" - - models "github.com/grafana/grafana/pkg/services/ngalert/models" -) - -// FakeScheduleService is an autogenerated mock type for the ScheduleService type -type FakeScheduleService struct { - mock.Mock -} - -// DeleteAlertRule provides a mock function with given fields: keys -func (_m *FakeScheduleService) DeleteAlertRule(keys ...models.AlertRuleKey) { - _m.Called(keys) -} - -// Run provides a mock function with given fields: _a0 -func (_m *FakeScheduleService) Run(_a0 context.Context) error { - ret := _m.Called(_a0) - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(_a0) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// UpdateAlertRule provides a mock function with given fields: key, lastVersion -func (_m *FakeScheduleService) UpdateAlertRule(key models.AlertRuleKey, lastVersion int64, isPaused bool) { - _m.Called(key, lastVersion, isPaused) -} - -// evalApplied provides a mock function with given fields: _a0, _a1 -func (_m *FakeScheduleService) evalApplied(_a0 models.AlertRuleKey, _a1 time.Time) { - _m.Called(_a0, _a1) -} - -// overrideCfg provides a mock function with given fields: cfg -func (_m *FakeScheduleService) overrideCfg(cfg SchedulerCfg) { - _m.Called(cfg) -} - -// stopApplied provides a mock function with given fields: _a0 -func (_m *FakeScheduleService) stopApplied(_a0 models.AlertRuleKey) { - _m.Called(_a0) -} diff --git a/pkg/services/ngalert/schedule/schedule_unit_test.go b/pkg/services/ngalert/schedule/schedule_unit_test.go index abd94a4ae55..e6b96e46ce3 100644 --- a/pkg/services/ngalert/schedule/schedule_unit_test.go +++ b/pkg/services/ngalert/schedule/schedule_unit_test.go @@ -104,13 +104,13 @@ func TestProcessTicks(t *testing.T) { t.Run("on 1st tick alert rule should be evaluated", func(t *testing.T) { tick = tick.Add(cfg.BaseInterval) - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Len(t, scheduled, 1) require.Equal(t, alertRule1, scheduled[0].rule) require.Equal(t, tick, scheduled[0].scheduledAt) require.Emptyf(t, stopped, "None rules are expected to be stopped") - + require.Emptyf(t, updated, "None rules are expected to be updated") assertEvalRun(t, evalAppliedCh, tick, alertRule1.GetKey()) }) @@ -132,12 +132,13 @@ func TestProcessTicks(t *testing.T) { t.Run("on 2nd tick first alert rule should be evaluated", func(t *testing.T) { tick = tick.Add(cfg.BaseInterval) - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Len(t, scheduled, 1) require.Equal(t, alertRule1, scheduled[0].rule) require.Equal(t, tick, scheduled[0].scheduledAt) require.Emptyf(t, stopped, "None rules are expected to be stopped") + require.Emptyf(t, updated, "None rules are expected to be updated") assertEvalRun(t, evalAppliedCh, tick, alertRule1.GetKey()) }) @@ -155,7 +156,7 @@ func TestProcessTicks(t *testing.T) { t.Run("on 3rd tick two alert rules should be evaluated", func(t *testing.T) { tick = tick.Add(cfg.BaseInterval) - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Len(t, scheduled, 2) var keys []models.AlertRuleKey for _, item := range scheduled { @@ -166,19 +167,19 @@ func TestProcessTicks(t *testing.T) { require.Contains(t, keys, alertRule2.GetKey()) require.Emptyf(t, stopped, "None rules are expected to be stopped") - + require.Emptyf(t, updated, "None rules are expected to be updated") assertEvalRun(t, evalAppliedCh, tick, keys...) }) t.Run("on 4th tick only one alert rule should be evaluated", func(t *testing.T) { tick = tick.Add(cfg.BaseInterval) - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Len(t, scheduled, 1) require.Equal(t, alertRule1, scheduled[0].rule) require.Equal(t, tick, scheduled[0].scheduledAt) require.Emptyf(t, stopped, "None rules are expected to be stopped") - + require.Emptyf(t, updated, "None rules are expected to be updated") assertEvalRun(t, evalAppliedCh, tick, alertRule1.GetKey()) }) @@ -187,13 +188,13 @@ func TestProcessTicks(t *testing.T) { alertRule1.IsPaused = true - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Len(t, scheduled, 1) require.Equal(t, alertRule1, scheduled[0].rule) require.Equal(t, tick, scheduled[0].scheduledAt) require.Emptyf(t, stopped, "None rules are expected to be stopped") - + require.Emptyf(t, updated, "None rules are expected to be updated") assertEvalRun(t, evalAppliedCh, tick, alertRule1.GetKey()) }) @@ -214,7 +215,7 @@ func TestProcessTicks(t *testing.T) { alertRule2.IsPaused = true - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Len(t, scheduled, 2) var keys []models.AlertRuleKey @@ -226,7 +227,7 @@ func TestProcessTicks(t *testing.T) { require.Contains(t, keys, alertRule2.GetKey()) require.Emptyf(t, stopped, "None rules are expected to be stopped") - + require.Emptyf(t, updated, "None rules are expected to be updated") assertEvalRun(t, evalAppliedCh, tick, keys...) }) @@ -248,13 +249,13 @@ func TestProcessTicks(t *testing.T) { alertRule1.IsPaused = false alertRule2.IsPaused = false - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Len(t, scheduled, 1) require.Equal(t, alertRule1, scheduled[0].rule) require.Equal(t, tick, scheduled[0].scheduledAt) require.Emptyf(t, stopped, "None rules are expected to be stopped") - + require.Emptyf(t, updated, "None rules are expected to be updated") assertEvalRun(t, evalAppliedCh, tick, alertRule1.GetKey()) }) @@ -275,11 +276,11 @@ func TestProcessTicks(t *testing.T) { ruleStore.DeleteRule(alertRule1) - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Empty(t, scheduled) require.Len(t, stopped, 1) - + require.Emptyf(t, updated, "None rules are expected to be updated") require.Contains(t, stopped, alertRule1.GetKey()) assertStopRun(t, stopAppliedCh, alertRule1.GetKey()) @@ -300,31 +301,54 @@ func TestProcessTicks(t *testing.T) { t.Run("on 9th tick one alert rule should be evaluated", func(t *testing.T) { tick = tick.Add(cfg.BaseInterval) - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Len(t, scheduled, 1) require.Equal(t, alertRule2, scheduled[0].rule) require.Equal(t, tick, scheduled[0].scheduledAt) require.Emptyf(t, stopped, "None rules are expected to be stopped") - + require.Emptyf(t, updated, "None rules are expected to be updated") assertEvalRun(t, evalAppliedCh, tick, alertRule2.GetKey()) }) + // create alert rule with one base interval + alertRule3 := models.AlertRuleGen(models.WithOrgID(mainOrgID), models.WithInterval(cfg.BaseInterval), models.WithTitle("rule-3"))() + ruleStore.PutRule(ctx, alertRule3) + t.Run("on 10th tick a new alert rule should be evaluated", func(t *testing.T) { - // create alert rule with one base interval - alertRule3 := models.AlertRuleGen(models.WithOrgID(mainOrgID), models.WithInterval(cfg.BaseInterval), models.WithTitle("rule-3"))() - ruleStore.PutRule(ctx, alertRule3) tick = tick.Add(cfg.BaseInterval) - scheduled, stopped := sched.processTick(ctx, dispatcherGroup, tick) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) require.Len(t, scheduled, 1) require.Equal(t, alertRule3, scheduled[0].rule) require.Equal(t, tick, scheduled[0].scheduledAt) require.Emptyf(t, stopped, "None rules are expected to be stopped") - + require.Emptyf(t, updated, "None rules are expected to be updated") assertEvalRun(t, evalAppliedCh, tick, alertRule3.GetKey()) }) + t.Run("on 11th tick rule2 should be updated", func(t *testing.T) { + newRule2 := models.CopyRule(alertRule2) + newRule2.Version++ + expectedUpdated := models.AlertRuleKeyWithVersion{ + Version: newRule2.Version, + AlertRuleKey: newRule2.GetKey(), + } + + ruleStore.PutRule(context.Background(), newRule2) + + tick = tick.Add(cfg.BaseInterval) + scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick) + + require.Len(t, scheduled, 1) + require.Equal(t, alertRule3, scheduled[0].rule) + require.Equal(t, tick, scheduled[0].scheduledAt) + + require.Emptyf(t, stopped, "None rules are expected to be stopped") + + require.Len(t, updated, 1) + require.Equal(t, expectedUpdated, updated[0]) + }) } func TestSchedule_ruleRoutine(t *testing.T) { @@ -719,49 +743,14 @@ func TestSchedule_ruleRoutine(t *testing.T) { }) } -func TestSchedule_UpdateAlertRule(t *testing.T) { - t.Run("when rule exists", func(t *testing.T) { - t.Run("it should call Update", func(t *testing.T) { - sch := setupScheduler(t, nil, nil, nil, nil, nil) - key := models.GenerateRuleKey(rand.Int63()) - info, _ := sch.registry.getOrCreateInfo(context.Background(), key) - version := rand.Int63() - go func() { - sch.UpdateAlertRule(key, version, false) - }() - - select { - case v := <-info.updateCh: - require.Equal(t, ruleVersionAndPauseStatus{ruleVersion(version), false}, v) - case <-time.After(5 * time.Second): - t.Fatal("No message was received on update channel") - } - }) - t.Run("should exit if rule is being stopped", func(t *testing.T) { - sch := setupScheduler(t, nil, nil, nil, nil, nil) - key := models.GenerateRuleKey(rand.Int63()) - info, _ := sch.registry.getOrCreateInfo(context.Background(), key) - info.stop(nil) - sch.UpdateAlertRule(key, rand.Int63(), false) - }) - }) - t.Run("when rule does not exist", func(t *testing.T) { - t.Run("should exit", func(t *testing.T) { - sch := setupScheduler(t, nil, nil, nil, nil, nil) - key := models.GenerateRuleKey(rand.Int63()) - sch.UpdateAlertRule(key, rand.Int63(), false) - }) - }) -} - -func TestSchedule_DeleteAlertRule(t *testing.T) { +func TestSchedule_deleteAlertRule(t *testing.T) { t.Run("when rule exists", func(t *testing.T) { t.Run("it should stop evaluation loop and remove the controller from registry", func(t *testing.T) { sch := setupScheduler(t, nil, nil, nil, nil, nil) rule := models.AlertRuleGen()() key := rule.GetKey() info, _ := sch.registry.getOrCreateInfo(context.Background(), key) - sch.DeleteAlertRule(key) + sch.deleteAlertRule(key) require.ErrorIs(t, info.ctx.Err(), errRuleDeleted) require.False(t, sch.registry.exists(key)) }) @@ -770,7 +759,7 @@ func TestSchedule_DeleteAlertRule(t *testing.T) { t.Run("should exit", func(t *testing.T) { sch := setupScheduler(t, nil, nil, nil, nil, nil) key := models.GenerateRuleKey(rand.Int63()) - sch.DeleteAlertRule(key) + sch.deleteAlertRule(key) }) }) }