mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Alerting: Update rules version when folder title is updated (#53013)
* remove support for bus from scheduler * rename event to FolderTitleUpdated and fire only if title has changed * add method to increase version of all rules that belong to a folder * update ngalert service to subscribe to folder title change event call data store and update scheduler * add tests
This commit is contained in:
@@ -71,7 +71,7 @@ type DataSourceCreated struct {
|
||||
OrgID int64 `json:"org_id"`
|
||||
}
|
||||
|
||||
type FolderUpdated struct {
|
||||
type FolderTitleUpdated struct {
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Title string `json:"name"`
|
||||
ID int64 `json:"id"`
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
"github.com/grafana/grafana/pkg/events"
|
||||
@@ -207,6 +206,7 @@ func (f *FolderServiceImpl) UpdateFolder(ctx context.Context, user *models.Signe
|
||||
}
|
||||
|
||||
dashFolder := query.Result
|
||||
currentTitle := dashFolder.Title
|
||||
|
||||
if !dashFolder.IsFolder {
|
||||
return dashboards.ErrFolderNotFound
|
||||
@@ -238,14 +238,16 @@ func (f *FolderServiceImpl) UpdateFolder(ctx context.Context, user *models.Signe
|
||||
}
|
||||
cmd.Result = folder
|
||||
|
||||
if err := f.bus.Publish(ctx, &events.FolderUpdated{
|
||||
Timestamp: time.Now(),
|
||||
Title: folder.Title,
|
||||
ID: dash.Id,
|
||||
UID: dash.Uid,
|
||||
OrgID: orgID,
|
||||
}); err != nil {
|
||||
f.log.Error("failed to publish FolderUpdated event", "folder", folder.Title, "user", user.UserId, "error", err)
|
||||
if currentTitle != folder.Title {
|
||||
if err := f.bus.Publish(ctx, &events.FolderTitleUpdated{
|
||||
Timestamp: folder.Updated,
|
||||
Title: folder.Title,
|
||||
ID: dash.Id,
|
||||
UID: dash.Uid,
|
||||
OrgID: orgID,
|
||||
}); err != nil {
|
||||
f.log.Error("failed to publish FolderTitleUpdated event", "folder", folder.Title, "user", user.UserId, "error", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
||||
@@ -188,8 +188,13 @@ func (alertRule *AlertRule) Diff(rule *AlertRule, ignore ...string) cmputil.Diff
|
||||
|
||||
// AlertRuleKey is the alert definition identifier
|
||||
type AlertRuleKey struct {
|
||||
OrgID int64
|
||||
UID string
|
||||
OrgID int64 `xorm:"org_id"`
|
||||
UID string `xorm:"uid"`
|
||||
}
|
||||
|
||||
type AlertRuleKeyWithVersion struct {
|
||||
Version int64
|
||||
AlertRuleKey `xorm:"extends"`
|
||||
}
|
||||
|
||||
// AlertRuleGroupKey is the identifier of a group of alerts
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
models2 "github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
@@ -132,6 +133,18 @@ func WithSequentialGroupIndex() AlertRuleMutator {
|
||||
}
|
||||
}
|
||||
|
||||
func WithOrgID(orgId int64) AlertRuleMutator {
|
||||
return func(rule *AlertRule) {
|
||||
rule.OrgID = orgId
|
||||
}
|
||||
}
|
||||
|
||||
func WithNamespace(namespace *models2.Folder) AlertRuleMutator {
|
||||
return func(rule *AlertRule) {
|
||||
rule.NamespaceUID = namespace.Uid
|
||||
}
|
||||
}
|
||||
|
||||
func GenerateAlertLabels(count int, prefix string) data.Labels {
|
||||
labels := make(data.Labels, count)
|
||||
for i := 0; i < count; i++ {
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/api/routing"
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
"github.com/grafana/grafana/pkg/events"
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
"github.com/grafana/grafana/pkg/infra/kvstore"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
@@ -21,6 +22,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/image"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/provisioning"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
|
||||
@@ -162,7 +164,12 @@ func (ng *AlertNG) init() error {
|
||||
}
|
||||
|
||||
stateManager := state.NewManager(ng.Log, ng.Metrics.GetStateMetrics(), appUrl, store, store, ng.dashboardService, ng.imageService, clk)
|
||||
scheduler := schedule.NewScheduler(schedCfg, appUrl, stateManager, ng.bus)
|
||||
scheduler := schedule.NewScheduler(schedCfg, appUrl, stateManager)
|
||||
|
||||
// if it is required to include folder title to the alerts, we need to subscribe to changes of alert title
|
||||
if !ng.Cfg.UnifiedAlerting.ReservedLabels.IsReservedLabelDisabled(models.FolderTitleLabel) {
|
||||
subscribeToFolderChanges(ng.Log, ng.bus, store, scheduler)
|
||||
}
|
||||
|
||||
ng.stateManager = stateManager
|
||||
ng.schedule = scheduler
|
||||
@@ -207,6 +214,31 @@ func (ng *AlertNG) init() error {
|
||||
return DeclareFixedRoles(ng.accesscontrol)
|
||||
}
|
||||
|
||||
func subscribeToFolderChanges(logger log.Logger, bus bus.Bus, dbStore store.RuleStore, scheduler schedule.ScheduleService) {
|
||||
// if folder title is changed, we update all alert rules in that folder to make sure that all peers (in HA mode) will update folder title and
|
||||
// clean up the current state
|
||||
bus.AddEventListener(func(ctx context.Context, e *events.FolderTitleUpdated) error {
|
||||
// do not block the upstream execution
|
||||
go func(evt *events.FolderTitleUpdated) {
|
||||
logger.Debug("Got folder title updated event. updating rules in the folder", "folder_uid", evt.UID)
|
||||
updated, err := dbStore.IncreaseVersionForAllRulesInNamespace(context.Background(), evt.OrgID, evt.UID)
|
||||
if err != nil {
|
||||
logger.Error("Failed to update alert rules in the folder after its title was changed", "err", err, "folder_uid", evt.UID, "folder", evt.Title)
|
||||
return
|
||||
}
|
||||
if len(updated) > 0 {
|
||||
logger.Debug("rules that belong to the folder have been updated successfully. clearing their status", "updated_rules", len(updated))
|
||||
for _, key := range updated {
|
||||
scheduler.UpdateAlertRule(key.AlertRuleKey, key.Version)
|
||||
}
|
||||
} else {
|
||||
logger.Debug("no alert rules found in the folder. nothing to update", "folder_uid", evt.UID, "folder", evt.Title)
|
||||
}
|
||||
}(e)
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
// Run starts the scheduler and Alertmanager.
|
||||
func (ng *AlertNG) Run(ctx context.Context) error {
|
||||
ng.Log.Debug("ngalert starting")
|
||||
|
||||
73
pkg/services/ngalert/ngalert_test.go
Normal file
73
pkg/services/ngalert/ngalert_test.go
Normal file
@@ -0,0 +1,73 @@
|
||||
package ngalert
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
busmock "github.com/grafana/grafana/pkg/bus/mock"
|
||||
"github.com/grafana/grafana/pkg/events"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
models2 "github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/schedule"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
func Test_subscribeToFolderChanges(t *testing.T) {
|
||||
orgID := rand.Int63()
|
||||
folder := &models2.Folder{
|
||||
Id: 0,
|
||||
Uid: util.GenerateShortUID(),
|
||||
Title: "Folder" + util.GenerateShortUID(),
|
||||
}
|
||||
rules := models.GenerateAlertRules(5, models.AlertRuleGen(models.WithOrgID(orgID), models.WithNamespace(folder)))
|
||||
|
||||
bus := busmock.New()
|
||||
db := store.NewFakeRuleStore(t)
|
||||
db.Folders[orgID] = append(db.Folders[orgID], folder)
|
||||
db.PutRule(context.Background(), rules...)
|
||||
|
||||
scheduler := &schedule.FakeScheduleService{}
|
||||
scheduler.EXPECT().UpdateAlertRule(mock.Anything, mock.Anything).Return()
|
||||
|
||||
subscribeToFolderChanges(log.New("test"), bus, db, scheduler)
|
||||
|
||||
err := bus.Publish(context.Background(), &events.FolderTitleUpdated{
|
||||
Timestamp: time.Now(),
|
||||
Title: "Folder" + util.GenerateShortUID(),
|
||||
ID: folder.Id,
|
||||
UID: folder.Uid,
|
||||
OrgID: orgID,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Eventuallyf(t, func() bool {
|
||||
return len(db.GetRecordedCommands(func(cmd interface{}) (interface{}, bool) {
|
||||
c, ok := cmd.(store.GenericRecordedQuery)
|
||||
if !ok || c.Name != "IncreaseVersionForAllRulesInNamespace" {
|
||||
return nil, false
|
||||
}
|
||||
return c, true
|
||||
})) > 0
|
||||
}, time.Second, 10*time.Millisecond, "expected to call db store method but nothing was called")
|
||||
|
||||
var calledTimes int
|
||||
require.Eventuallyf(t, func() bool {
|
||||
for _, call := range scheduler.Calls {
|
||||
if call.Method == "UpdateAlertRule" {
|
||||
calledTimes++
|
||||
}
|
||||
}
|
||||
return calledTimes == len(rules)
|
||||
}, time.Second, 10*time.Millisecond, "scheduler was expected to be called %d times but called %d", len(rules), calledTimes)
|
||||
|
||||
for _, rule := range rules {
|
||||
scheduler.AssertCalled(t, "UpdateAlertRule", rule.GetKey(), rule.Version)
|
||||
}
|
||||
}
|
||||
@@ -8,8 +8,6 @@ import (
|
||||
|
||||
prometheusModel "github.com/prometheus/common/model"
|
||||
|
||||
"github.com/grafana/grafana/pkg/bus"
|
||||
"github.com/grafana/grafana/pkg/events"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/models"
|
||||
"github.com/grafana/grafana/pkg/services/alerting"
|
||||
@@ -27,23 +25,19 @@ import (
|
||||
|
||||
// ScheduleService is an interface for a service that schedules the evaluation
|
||||
// of alert rules.
|
||||
//go:generate mockery --name ScheduleService --structname FakeScheduleService --inpackage --filename schedule_mock.go
|
||||
//go:generate mockery --name ScheduleService --structname FakeScheduleService --inpackage --filename schedule_mock.go --with-expecter
|
||||
type ScheduleService interface {
|
||||
// Run the scheduler until the context is canceled or the scheduler returns
|
||||
// an error. The scheduler is terminated when this function returns.
|
||||
Run(context.Context) error
|
||||
// UpdateAlertRule notifies scheduler that a rule has been changed
|
||||
UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int64)
|
||||
// UpdateAlertRulesByNamespaceUID notifies scheduler that all rules in a namespace should be updated.
|
||||
UpdateAlertRulesByNamespaceUID(ctx context.Context, orgID int64, uid string) error
|
||||
// DeleteAlertRule notifies scheduler that a rule has been changed
|
||||
DeleteAlertRule(key ngmodels.AlertRuleKey)
|
||||
// the following are used by tests only used for tests
|
||||
evalApplied(ngmodels.AlertRuleKey, time.Time)
|
||||
stopApplied(ngmodels.AlertRuleKey)
|
||||
overrideCfg(cfg SchedulerCfg)
|
||||
|
||||
folderUpdateHandler(ctx context.Context, evt *events.FolderUpdated) error
|
||||
}
|
||||
|
||||
//go:generate mockery --name AlertsSender --structname AlertsSenderMock --inpackage --filename alerts_sender_mock.go --with-expecter
|
||||
@@ -97,9 +91,6 @@ type schedule struct {
|
||||
// current tick depends on its evaluation interval and when it was
|
||||
// last evaluated.
|
||||
schedulableAlertRules alertRulesRegistry
|
||||
|
||||
// bus is used to hook into events that should cause rule updates.
|
||||
bus bus.Bus
|
||||
}
|
||||
|
||||
// SchedulerCfg is the scheduler configuration.
|
||||
@@ -117,7 +108,7 @@ type SchedulerCfg struct {
|
||||
}
|
||||
|
||||
// NewScheduler returns a new schedule.
|
||||
func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager, bus bus.Bus) *schedule {
|
||||
func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager) *schedule {
|
||||
ticker := alerting.NewTicker(cfg.C, cfg.Cfg.BaseInterval, cfg.Metrics.Ticker)
|
||||
|
||||
sch := schedule{
|
||||
@@ -138,12 +129,9 @@ func NewScheduler(cfg SchedulerCfg, appURL *url.URL, stateManager *state.Manager
|
||||
stateManager: stateManager,
|
||||
minRuleInterval: cfg.Cfg.MinInterval,
|
||||
schedulableAlertRules: alertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.AlertRule)},
|
||||
bus: bus,
|
||||
alertsSender: cfg.AlertSender,
|
||||
}
|
||||
|
||||
bus.AddEventListener(sch.folderUpdateHandler)
|
||||
|
||||
return &sch
|
||||
}
|
||||
|
||||
@@ -165,26 +153,6 @@ func (sch *schedule) UpdateAlertRule(key ngmodels.AlertRuleKey, lastVersion int6
|
||||
ruleInfo.update(ruleVersion(lastVersion))
|
||||
}
|
||||
|
||||
// UpdateAlertRulesByNamespaceUID looks for the active rule evaluation for every rule in the given namespace and commands it to update the rule.
|
||||
func (sch *schedule) UpdateAlertRulesByNamespaceUID(ctx context.Context, orgID int64, uid string) error {
|
||||
q := ngmodels.ListAlertRulesQuery{
|
||||
OrgID: orgID,
|
||||
NamespaceUIDs: []string{uid},
|
||||
}
|
||||
if err := sch.ruleStore.ListAlertRules(ctx, &q); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, r := range q.Result {
|
||||
sch.UpdateAlertRule(ngmodels.AlertRuleKey{
|
||||
OrgID: orgID,
|
||||
UID: r.UID,
|
||||
}, r.Version)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// DeleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
|
||||
func (sch *schedule) DeleteAlertRule(key ngmodels.AlertRuleKey) {
|
||||
// It can happen that the scheduler has deleted the alert rule before the
|
||||
@@ -465,14 +433,6 @@ func (sch *schedule) saveAlertStates(ctx context.Context, states []*state.State)
|
||||
}
|
||||
}
|
||||
|
||||
// folderUpdateHandler listens for folder update events and updates all rules in the given folder.
|
||||
func (sch *schedule) folderUpdateHandler(ctx context.Context, evt *events.FolderUpdated) error {
|
||||
if sch.disableGrafanaFolder {
|
||||
return nil
|
||||
}
|
||||
return sch.UpdateAlertRulesByNamespaceUID(ctx, evt.OrgID, evt.UID)
|
||||
}
|
||||
|
||||
// overrideCfg is only used on tests.
|
||||
func (sch *schedule) overrideCfg(cfg SchedulerCfg) {
|
||||
sch.clock = cfg.C
|
||||
|
||||
@@ -5,10 +5,8 @@ package schedule
|
||||
import (
|
||||
context "context"
|
||||
|
||||
events "github.com/grafana/grafana/pkg/events"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
models "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
|
||||
time "time"
|
||||
)
|
||||
@@ -18,11 +16,42 @@ type FakeScheduleService struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
type FakeScheduleService_Expecter struct {
|
||||
mock *mock.Mock
|
||||
}
|
||||
|
||||
func (_m *FakeScheduleService) EXPECT() *FakeScheduleService_Expecter {
|
||||
return &FakeScheduleService_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// DeleteAlertRule provides a mock function with given fields: key
|
||||
func (_m *FakeScheduleService) DeleteAlertRule(key models.AlertRuleKey) {
|
||||
_m.Called(key)
|
||||
}
|
||||
|
||||
// FakeScheduleService_DeleteAlertRule_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteAlertRule'
|
||||
type FakeScheduleService_DeleteAlertRule_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// DeleteAlertRule is a helper method to define mock.On call
|
||||
// - key models.AlertRuleKey
|
||||
func (_e *FakeScheduleService_Expecter) DeleteAlertRule(key interface{}) *FakeScheduleService_DeleteAlertRule_Call {
|
||||
return &FakeScheduleService_DeleteAlertRule_Call{Call: _e.mock.On("DeleteAlertRule", key)}
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_DeleteAlertRule_Call) Run(run func(key models.AlertRuleKey)) *FakeScheduleService_DeleteAlertRule_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(models.AlertRuleKey))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_DeleteAlertRule_Call) Return() *FakeScheduleService_DeleteAlertRule_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
// Run provides a mock function with given fields: _a0
|
||||
func (_m *FakeScheduleService) Run(_a0 context.Context) error {
|
||||
ret := _m.Called(_a0)
|
||||
@@ -37,23 +66,56 @@ func (_m *FakeScheduleService) Run(_a0 context.Context) error {
|
||||
return r0
|
||||
}
|
||||
|
||||
// FakeScheduleService_Run_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Run'
|
||||
type FakeScheduleService_Run_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// Run is a helper method to define mock.On call
|
||||
// - _a0 context.Context
|
||||
func (_e *FakeScheduleService_Expecter) Run(_a0 interface{}) *FakeScheduleService_Run_Call {
|
||||
return &FakeScheduleService_Run_Call{Call: _e.mock.On("Run", _a0)}
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_Run_Call) Run(run func(_a0 context.Context)) *FakeScheduleService_Run_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_Run_Call) Return(_a0 error) *FakeScheduleService_Run_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
// UpdateAlertRule provides a mock function with given fields: key, lastVersion
|
||||
func (_m *FakeScheduleService) UpdateAlertRule(key models.AlertRuleKey, lastVersion int64) {
|
||||
_m.Called(key, lastVersion)
|
||||
}
|
||||
|
||||
// UpdateAlertRulesByNamespaceUID provides a mock function with given fields: ctx, orgID, uid
|
||||
func (_m *FakeScheduleService) UpdateAlertRulesByNamespaceUID(ctx context.Context, orgID int64, uid string) error {
|
||||
ret := _m.Called(ctx, orgID, uid)
|
||||
// FakeScheduleService_UpdateAlertRule_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateAlertRule'
|
||||
type FakeScheduleService_UpdateAlertRule_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, int64, string) error); ok {
|
||||
r0 = rf(ctx, orgID, uid)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
// UpdateAlertRule is a helper method to define mock.On call
|
||||
// - key models.AlertRuleKey
|
||||
// - lastVersion int64
|
||||
func (_e *FakeScheduleService_Expecter) UpdateAlertRule(key interface{}, lastVersion interface{}) *FakeScheduleService_UpdateAlertRule_Call {
|
||||
return &FakeScheduleService_UpdateAlertRule_Call{Call: _e.mock.On("UpdateAlertRule", key, lastVersion)}
|
||||
}
|
||||
|
||||
return r0
|
||||
func (_c *FakeScheduleService_UpdateAlertRule_Call) Run(run func(key models.AlertRuleKey, lastVersion int64)) *FakeScheduleService_UpdateAlertRule_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(models.AlertRuleKey), args[1].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_UpdateAlertRule_Call) Return() *FakeScheduleService_UpdateAlertRule_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
// evalApplied provides a mock function with given fields: _a0, _a1
|
||||
@@ -61,18 +123,28 @@ func (_m *FakeScheduleService) evalApplied(_a0 models.AlertRuleKey, _a1 time.Tim
|
||||
_m.Called(_a0, _a1)
|
||||
}
|
||||
|
||||
// folderUpdateHandler provides a mock function with given fields: ctx, evt
|
||||
func (_m *FakeScheduleService) folderUpdateHandler(ctx context.Context, evt *events.FolderUpdated) error {
|
||||
ret := _m.Called(ctx, evt)
|
||||
// FakeScheduleService_evalApplied_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'evalApplied'
|
||||
type FakeScheduleService_evalApplied_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, *events.FolderUpdated) error); ok {
|
||||
r0 = rf(ctx, evt)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
}
|
||||
// evalApplied is a helper method to define mock.On call
|
||||
// - _a0 models.AlertRuleKey
|
||||
// - _a1 time.Time
|
||||
func (_e *FakeScheduleService_Expecter) evalApplied(_a0 interface{}, _a1 interface{}) *FakeScheduleService_evalApplied_Call {
|
||||
return &FakeScheduleService_evalApplied_Call{Call: _e.mock.On("evalApplied", _a0, _a1)}
|
||||
}
|
||||
|
||||
return r0
|
||||
func (_c *FakeScheduleService_evalApplied_Call) Run(run func(_a0 models.AlertRuleKey, _a1 time.Time)) *FakeScheduleService_evalApplied_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(models.AlertRuleKey), args[1].(time.Time))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_evalApplied_Call) Return() *FakeScheduleService_evalApplied_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
// overrideCfg provides a mock function with given fields: cfg
|
||||
@@ -80,7 +152,53 @@ func (_m *FakeScheduleService) overrideCfg(cfg SchedulerCfg) {
|
||||
_m.Called(cfg)
|
||||
}
|
||||
|
||||
// FakeScheduleService_overrideCfg_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'overrideCfg'
|
||||
type FakeScheduleService_overrideCfg_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// overrideCfg is a helper method to define mock.On call
|
||||
// - cfg SchedulerCfg
|
||||
func (_e *FakeScheduleService_Expecter) overrideCfg(cfg interface{}) *FakeScheduleService_overrideCfg_Call {
|
||||
return &FakeScheduleService_overrideCfg_Call{Call: _e.mock.On("overrideCfg", cfg)}
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_overrideCfg_Call) Run(run func(cfg SchedulerCfg)) *FakeScheduleService_overrideCfg_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(SchedulerCfg))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_overrideCfg_Call) Return() *FakeScheduleService_overrideCfg_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
// stopApplied provides a mock function with given fields: _a0
|
||||
func (_m *FakeScheduleService) stopApplied(_a0 models.AlertRuleKey) {
|
||||
_m.Called(_a0)
|
||||
}
|
||||
|
||||
// FakeScheduleService_stopApplied_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'stopApplied'
|
||||
type FakeScheduleService_stopApplied_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// stopApplied is a helper method to define mock.On call
|
||||
// - _a0 models.AlertRuleKey
|
||||
func (_e *FakeScheduleService_Expecter) stopApplied(_a0 interface{}) *FakeScheduleService_stopApplied_Call {
|
||||
return &FakeScheduleService_stopApplied_Call{Call: _e.mock.On("stopApplied", _a0)}
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_stopApplied_Call) Run(run func(_a0 models.AlertRuleKey)) *FakeScheduleService_stopApplied_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(models.AlertRuleKey))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *FakeScheduleService_stopApplied_Call) Return() *FakeScheduleService_stopApplied_Call {
|
||||
_c.Call.Return()
|
||||
return _c
|
||||
}
|
||||
|
||||
@@ -18,7 +18,6 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
busmock "github.com/grafana/grafana/pkg/bus/mock"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/dashboards"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
@@ -172,7 +171,7 @@ func TestAlertingTicker(t *testing.T) {
|
||||
Scheme: "http",
|
||||
Host: "localhost",
|
||||
}
|
||||
sched := schedule.NewScheduler(schedCfg, appUrl, st, busmock.New())
|
||||
sched := schedule.NewScheduler(schedCfg, appUrl, st)
|
||||
|
||||
go func() {
|
||||
err := sched.Run(ctx)
|
||||
|
||||
@@ -19,7 +19,6 @@ import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
busmock "github.com/grafana/grafana/pkg/bus/mock"
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/annotations"
|
||||
@@ -527,7 +526,7 @@ func setupScheduler(t *testing.T, rs *store.FakeRuleStore, is *store.FakeInstanc
|
||||
AlertSender: senderMock,
|
||||
}
|
||||
st := state.NewManager(schedCfg.Logger, m.GetStateMetrics(), nil, rs, is, &dashboards.FakeDashboardService{}, &image.NoopImageService{}, mockedClock)
|
||||
return NewScheduler(schedCfg, appUrl, st, busmock.New())
|
||||
return NewScheduler(schedCfg, appUrl, st)
|
||||
}
|
||||
|
||||
func withQueryForState(t *testing.T, evalResult eval.State) models.AlertRuleMutator {
|
||||
|
||||
@@ -55,6 +55,9 @@ type RuleStore interface {
|
||||
// and return the map of uuid to id.
|
||||
InsertAlertRules(ctx context.Context, rule []ngmodels.AlertRule) (map[string]int64, error)
|
||||
UpdateAlertRules(ctx context.Context, rule []UpdateRule) error
|
||||
|
||||
// IncreaseVersionForAllRulesInNamespace Increases version for all rules that have specified namespace. Returns all rules that belong to the namespace
|
||||
IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersion, error)
|
||||
}
|
||||
|
||||
func getAlertRuleByUID(sess *sqlstore.DBSession, alertRuleUID string, orgID int64) (*ngmodels.AlertRule, error) {
|
||||
@@ -95,6 +98,20 @@ func (st DBstore) DeleteAlertRulesByUID(ctx context.Context, orgID int64, ruleUI
|
||||
})
|
||||
}
|
||||
|
||||
// IncreaseVersionForAllRulesInNamespace Increases version for all rules that have specified namespace. Returns all rules that belong to the namespace
|
||||
func (st DBstore) IncreaseVersionForAllRulesInNamespace(ctx context.Context, orgID int64, namespaceUID string) ([]ngmodels.AlertRuleKeyWithVersion, error) {
|
||||
var keys []ngmodels.AlertRuleKeyWithVersion
|
||||
err := st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
now := TimeNow()
|
||||
_, err := sess.Exec("UPDATE alert_rule SET version = version + 1, updated = ? WHERE namespace_uid = ? AND org_id = ?", now, namespaceUID, orgID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return sess.Table(ngmodels.AlertRule{}).Where("namespace_uid = ? AND org_id = ?", namespaceUID, orgID).Find(&keys)
|
||||
})
|
||||
return keys, err
|
||||
}
|
||||
|
||||
// DeleteAlertInstancesByRuleUID is a handler for deleting alert instances by alert rule UID when a rule has been updated
|
||||
func (st DBstore) DeleteAlertInstancesByRuleUID(ctx context.Context, orgID int64, ruleUID string) error {
|
||||
return st.SQLStore.WithTransactionalDbSession(ctx, func(sess *sqlstore.DBSession) error {
|
||||
|
||||
@@ -351,6 +351,30 @@ func (f *FakeRuleStore) UpdateRuleGroup(ctx context.Context, orgID int64, namesp
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeRuleStore) IncreaseVersionForAllRulesInNamespace(_ context.Context, orgID int64, namespaceUID string) ([]models.AlertRuleKeyWithVersion, error) {
|
||||
f.mtx.Lock()
|
||||
defer f.mtx.Unlock()
|
||||
|
||||
f.RecordedOps = append(f.RecordedOps, GenericRecordedQuery{
|
||||
Name: "IncreaseVersionForAllRulesInNamespace",
|
||||
Params: []interface{}{orgID, namespaceUID},
|
||||
})
|
||||
|
||||
var result []models.AlertRuleKeyWithVersion
|
||||
|
||||
for _, rule := range f.Rules[orgID] {
|
||||
if rule.NamespaceUID == namespaceUID && rule.OrgID == orgID {
|
||||
rule.Version++
|
||||
rule.Updated = TimeNow()
|
||||
result = append(result, models.AlertRuleKeyWithVersion{
|
||||
Version: rule.Version,
|
||||
AlertRuleKey: rule.GetKey(),
|
||||
})
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type FakeInstanceStore struct {
|
||||
mtx sync.Mutex
|
||||
RecordedOps []interface{}
|
||||
|
||||
Reference in New Issue
Block a user