Alerting: Scheduler to fetch folders along with rules (#52842)

* Update GetAlertRulesForScheduling to query for folders (if needed)
* Update scheduler's alertRulesRegistry to cache folder titles along with rules
* Update rule eval loop to take folder title from the
* Extract interface RuleStore 
* Pre-fetch the rule keys with the version to detect changes, and query the full table only if there are changes.
This commit is contained in:
Yuriy Tseretyan 2022-08-31 11:08:19 -04:00 committed by GitHub
parent 60839d9c30
commit 76ea0b15ae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 299 additions and 87 deletions

View File

@ -344,7 +344,10 @@ type ListAlertRulesQuery struct {
} }
type GetAlertRulesForSchedulingQuery struct { type GetAlertRulesForSchedulingQuery struct {
Result []*AlertRule PopulateFolders bool
ResultRules []*AlertRule
ResultFoldersTitles map[string]string
} }
// ListNamespaceAlertRulesQuery is the query for listing namespace alert rules // ListNamespaceAlertRulesQuery is the query for listing namespace alert rules

View File

@ -43,11 +43,24 @@ func (sch *schedule) updateSchedulableAlertRules(ctx context.Context) error {
time.Since(start).Seconds()) time.Since(start).Seconds())
}() }()
q := models.GetAlertRulesForSchedulingQuery{} if !sch.schedulableAlertRules.isEmpty() {
keys, err := sch.ruleStore.GetAlertRulesKeysForScheduling(ctx)
if err != nil {
return err
}
if !sch.schedulableAlertRules.needsUpdate(keys) {
sch.log.Info("no changes detected. Skip updating")
return nil
}
}
q := models.GetAlertRulesForSchedulingQuery{
PopulateFolders: !sch.disableGrafanaFolder,
}
if err := sch.ruleStore.GetAlertRulesForScheduling(ctx, &q); err != nil { if err := sch.ruleStore.GetAlertRulesForScheduling(ctx, &q); err != nil {
return fmt.Errorf("failed to get alert rules: %w", err) return fmt.Errorf("failed to get alert rules: %w", err)
} }
sch.log.Debug("alert rules fetched", "count", len(q.Result)) sch.log.Debug("alert rules fetched", "rules_count", len(q.ResultRules), "folders_count", len(q.ResultFoldersTitles))
sch.schedulableAlertRules.set(q.Result) sch.schedulableAlertRules.set(q.ResultRules, q.ResultFoldersTitles)
return nil return nil
} }

View File

@ -96,7 +96,7 @@ func newAlertRuleInfo(parent context.Context) *alertRuleInfo {
// - true when message was sent // - true when message was sent
// - false when the send operation is stopped // - false when the send operation is stopped
// the second element contains a dropped message that was sent by a concurrent sender. // the second element contains a dropped message that was sent by a concurrent sender.
func (a *alertRuleInfo) eval(t time.Time, rule *models.AlertRule) (bool, *evaluation) { func (a *alertRuleInfo) eval(eval *evaluation) (bool, *evaluation) {
// read the channel in unblocking manner to make sure that there is no concurrent send operation. // read the channel in unblocking manner to make sure that there is no concurrent send operation.
var droppedMsg *evaluation var droppedMsg *evaluation
select { select {
@ -105,10 +105,7 @@ func (a *alertRuleInfo) eval(t time.Time, rule *models.AlertRule) (bool, *evalua
} }
select { select {
case a.evalCh <- &evaluation{ case a.evalCh <- eval:
scheduledAt: t,
rule: rule,
}:
return true, droppedMsg return true, droppedMsg
case <-a.ctx.Done(): case <-a.ctx.Done():
return false, droppedMsg return false, droppedMsg
@ -141,22 +138,24 @@ func (a *alertRuleInfo) update(lastVersion ruleVersion) bool {
type evaluation struct { type evaluation struct {
scheduledAt time.Time scheduledAt time.Time
rule *models.AlertRule rule *models.AlertRule
folderTitle string
} }
type alertRulesRegistry struct { type alertRulesRegistry struct {
rules map[models.AlertRuleKey]*models.AlertRule rules map[models.AlertRuleKey]*models.AlertRule
mu sync.Mutex folderTitles map[string]string
mu sync.Mutex
} }
// all returns all rules in the registry. // all returns all rules in the registry.
func (r *alertRulesRegistry) all() []*models.AlertRule { func (r *alertRulesRegistry) all() ([]*models.AlertRule, map[string]string) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
result := make([]*models.AlertRule, 0, len(r.rules)) result := make([]*models.AlertRule, 0, len(r.rules))
for _, rule := range r.rules { for _, rule := range r.rules {
result = append(result, rule) result = append(result, rule)
} }
return result return result, r.folderTitles
} }
func (r *alertRulesRegistry) get(k models.AlertRuleKey) *models.AlertRule { func (r *alertRulesRegistry) get(k models.AlertRuleKey) *models.AlertRule {
@ -166,13 +165,15 @@ func (r *alertRulesRegistry) get(k models.AlertRuleKey) *models.AlertRule {
} }
// set replaces all rules in the registry. // set replaces all rules in the registry.
func (r *alertRulesRegistry) set(rules []*models.AlertRule) { func (r *alertRulesRegistry) set(rules []*models.AlertRule, folders map[string]string) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
r.rules = make(map[models.AlertRuleKey]*models.AlertRule) r.rules = make(map[models.AlertRuleKey]*models.AlertRule)
for _, rule := range rules { for _, rule := range rules {
r.rules[rule.GetKey()] = rule r.rules[rule.GetKey()] = rule
} }
// return the map as is without copying because it is not mutated
r.folderTitles = folders
} }
// update inserts or replaces a rule in the registry. // update inserts or replaces a rule in the registry.
@ -194,3 +195,22 @@ func (r *alertRulesRegistry) del(k models.AlertRuleKey) (*models.AlertRule, bool
} }
return rule, ok return rule, ok
} }
func (r *alertRulesRegistry) isEmpty() bool {
r.mu.Lock()
defer r.mu.Unlock()
return len(r.rules) == 0
}
func (r *alertRulesRegistry) needsUpdate(keys []models.AlertRuleKeyWithVersion) bool {
if len(r.rules) != len(keys) {
return true
}
for _, key := range keys {
rule, ok := r.rules[key.AlertRuleKey]
if !ok || rule.Version != key.Version {
return true
}
}
return false
}

View File

@ -13,6 +13,7 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/util"
) )
func TestSchedule_alertRuleInfo(t *testing.T) { func TestSchedule_alertRuleInfo(t *testing.T) {
@ -91,15 +92,18 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := newAlertRuleInfo(context.Background())
expected := time.Now() expected := time.Now()
resultCh := make(chan evalResponse) resultCh := make(chan evalResponse)
rule := models.AlertRuleGen()() data := &evaluation{
scheduledAt: expected,
rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(),
}
go func() { go func() {
result, dropped := r.eval(expected, rule) result, dropped := r.eval(data)
resultCh <- evalResponse{result, dropped} resultCh <- evalResponse{result, dropped}
}() }()
select { select {
case ctx := <-r.evalCh: case ctx := <-r.evalCh:
require.Equal(t, rule, ctx.rule) require.Equal(t, data, ctx)
require.Equal(t, expected, ctx.scheduledAt)
result := <-resultCh result := <-resultCh
require.True(t, result.success) require.True(t, result.success)
require.Nilf(t, result.droppedEval, "expected no dropped evaluations but got one") require.Nilf(t, result.droppedEval, "expected no dropped evaluations but got one")
@ -113,12 +117,21 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
time2 := time.UnixMilli(rand.Int63n(math.MaxInt64)) time2 := time.UnixMilli(rand.Int63n(math.MaxInt64))
resultCh1 := make(chan evalResponse) resultCh1 := make(chan evalResponse)
resultCh2 := make(chan evalResponse) resultCh2 := make(chan evalResponse)
rule := models.AlertRuleGen()() data := &evaluation{
scheduledAt: time1,
rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(),
}
data2 := &evaluation{
scheduledAt: time2,
rule: data.rule,
folderTitle: data.folderTitle,
}
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
wg.Done() wg.Done()
result, dropped := r.eval(time1, rule) result, dropped := r.eval(data)
wg.Done() wg.Done()
resultCh1 <- evalResponse{result, dropped} resultCh1 <- evalResponse{result, dropped}
}() }()
@ -126,7 +139,7 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started wg.Add(2) // one when time1 is sent, another when go-routine for time2 has started
go func() { go func() {
wg.Done() wg.Done()
result, dropped := r.eval(time2, rule) result, dropped := r.eval(data2)
resultCh2 <- evalResponse{result, dropped} resultCh2 <- evalResponse{result, dropped}
}() }()
wg.Wait() // at this point tick 1 has already been dropped wg.Wait() // at this point tick 1 has already been dropped
@ -147,9 +160,13 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
t.Run("eval should exit when context is cancelled", func(t *testing.T) { t.Run("eval should exit when context is cancelled", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := newAlertRuleInfo(context.Background())
resultCh := make(chan evalResponse) resultCh := make(chan evalResponse)
rule := models.AlertRuleGen()() data := &evaluation{
scheduledAt: time.Now(),
rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(),
}
go func() { go func() {
result, dropped := r.eval(time.Now(), rule) result, dropped := r.eval(data)
resultCh <- evalResponse{result, dropped} resultCh <- evalResponse{result, dropped}
}() }()
runtime.Gosched() runtime.Gosched()
@ -173,8 +190,12 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
t.Run("eval should do nothing", func(t *testing.T) { t.Run("eval should do nothing", func(t *testing.T) {
r := newAlertRuleInfo(context.Background()) r := newAlertRuleInfo(context.Background())
r.stop(nil) r.stop(nil)
rule := models.AlertRuleGen()() data := &evaluation{
success, dropped := r.eval(time.Now(), rule) scheduledAt: time.Now(),
rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(),
}
success, dropped := r.eval(data)
require.False(t, success) require.False(t, success)
require.Nilf(t, dropped, "expected no dropped evaluations but got one") require.Nilf(t, dropped, "expected no dropped evaluations but got one")
}) })
@ -218,7 +239,11 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
case 1: case 1:
r.update(ruleVersion(rand.Int63())) r.update(ruleVersion(rand.Int63()))
case 2: case 2:
r.eval(time.Now(), models.AlertRuleGen()()) r.eval(&evaluation{
scheduledAt: time.Now(),
rule: models.AlertRuleGen()(),
folderTitle: util.GenerateShortUID(),
})
case 3: case 3:
r.stop(nil) r.stop(nil)
} }
@ -233,25 +258,33 @@ func TestSchedule_alertRuleInfo(t *testing.T) {
func TestSchedulableAlertRulesRegistry(t *testing.T) { func TestSchedulableAlertRulesRegistry(t *testing.T) {
r := alertRulesRegistry{rules: make(map[models.AlertRuleKey]*models.AlertRule)} r := alertRulesRegistry{rules: make(map[models.AlertRuleKey]*models.AlertRule)}
assert.Len(t, r.all(), 0) rules, folders := r.all()
assert.Len(t, rules, 0)
assert.Len(t, folders, 0)
expectedFolders := map[string]string{"test-uid": "test-title"}
// replace all rules in the registry with foo // replace all rules in the registry with foo
r.set([]*models.AlertRule{{OrgID: 1, UID: "foo", Version: 1}}) r.set([]*models.AlertRule{{OrgID: 1, UID: "foo", Version: 1}}, expectedFolders)
assert.Len(t, r.all(), 1) rules, folders = r.all()
assert.Len(t, rules, 1)
assert.Equal(t, expectedFolders, folders)
foo := r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}) foo := r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"})
require.NotNil(t, foo) require.NotNil(t, foo)
assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 1}, *foo) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 1}, *foo)
// update foo to a newer version // update foo to a newer version
r.update(&models.AlertRule{OrgID: 1, UID: "foo", Version: 2}) r.update(&models.AlertRule{OrgID: 1, UID: "foo", Version: 2})
assert.Len(t, r.all(), 1) rules, _ = r.all()
assert.Len(t, rules, 1)
foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}) foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"})
require.NotNil(t, foo) require.NotNil(t, foo)
assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo)
// update bar which does not exist in the registry // update bar which does not exist in the registry
r.update(&models.AlertRule{OrgID: 1, UID: "bar", Version: 1}) r.update(&models.AlertRule{OrgID: 1, UID: "bar", Version: 1})
assert.Len(t, r.all(), 2) rules, _ = r.all()
assert.Len(t, rules, 2)
foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"}) foo = r.get(models.AlertRuleKey{OrgID: 1, UID: "foo"})
require.NotNil(t, foo) require.NotNil(t, foo)
assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "foo", Version: 2}, *foo)
@ -260,8 +293,10 @@ func TestSchedulableAlertRulesRegistry(t *testing.T) {
assert.Equal(t, models.AlertRule{OrgID: 1, UID: "bar", Version: 1}, *bar) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "bar", Version: 1}, *bar)
// replace all rules in the registry with baz // replace all rules in the registry with baz
r.set([]*models.AlertRule{{OrgID: 1, UID: "baz", Version: 1}}) r.set([]*models.AlertRule{{OrgID: 1, UID: "baz", Version: 1}}, nil)
assert.Len(t, r.all(), 1) rules, folders = r.all()
assert.Len(t, rules, 1)
assert.Nil(t, folders)
baz := r.get(models.AlertRuleKey{OrgID: 1, UID: "baz"}) baz := r.get(models.AlertRuleKey{OrgID: 1, UID: "baz"})
require.NotNil(t, baz) require.NotNil(t, baz)
assert.Equal(t, models.AlertRule{OrgID: 1, UID: "baz", Version: 1}, *baz) assert.Equal(t, models.AlertRule{OrgID: 1, UID: "baz", Version: 1}, *baz)
@ -273,7 +308,9 @@ func TestSchedulableAlertRulesRegistry(t *testing.T) {
assert.True(t, ok) assert.True(t, ok)
require.NotNil(t, deleted) require.NotNil(t, deleted)
assert.Equal(t, *deleted, *baz) assert.Equal(t, *deleted, *baz)
assert.Len(t, r.all(), 0) rules, folders = r.all()
assert.Len(t, rules, 0)
assert.Len(t, folders, 0)
assert.Nil(t, r.get(models.AlertRuleKey{OrgID: 1, UID: "baz"})) assert.Nil(t, r.get(models.AlertRuleKey{OrgID: 1, UID: "baz"}))
// baz cannot be deleted twice // baz cannot be deleted twice

View File

@ -17,8 +17,6 @@ import (
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models" ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state" "github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/store" "github.com/grafana/grafana/pkg/services/ngalert/store"
"github.com/grafana/grafana/pkg/services/org"
"github.com/grafana/grafana/pkg/services/user"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/setting"
"github.com/benbjohnson/clock" "github.com/benbjohnson/clock"
@ -48,6 +46,12 @@ type AlertsSender interface {
Send(key ngmodels.AlertRuleKey, alerts definitions.PostableAlerts) Send(key ngmodels.AlertRuleKey, alerts definitions.PostableAlerts)
} }
// RulesStore is a store that provides alert rules for scheduling
type RulesStore interface {
GetAlertRulesKeysForScheduling(ctx context.Context) ([]ngmodels.AlertRuleKeyWithVersion, error)
GetAlertRulesForScheduling(ctx context.Context, query *ngmodels.GetAlertRulesForSchedulingQuery) error
}
type schedule struct { type schedule struct {
// base tick rate (fastest possible configured check) // base tick rate (fastest possible configured check)
baseInterval time.Duration baseInterval time.Duration
@ -75,7 +79,7 @@ type schedule struct {
evaluator eval.Evaluator evaluator eval.Evaluator
ruleStore store.RuleStore ruleStore RulesStore
stateManager *state.Manager stateManager *state.Manager
@ -102,7 +106,7 @@ type SchedulerCfg struct {
EvalAppliedFunc func(ngmodels.AlertRuleKey, time.Time) EvalAppliedFunc func(ngmodels.AlertRuleKey, time.Time)
StopAppliedFunc func(ngmodels.AlertRuleKey) StopAppliedFunc func(ngmodels.AlertRuleKey)
Evaluator eval.Evaluator Evaluator eval.Evaluator
RuleStore store.RuleStore RuleStore RulesStore
InstanceStore store.InstanceStore InstanceStore store.InstanceStore
Metrics *metrics.Scheduler Metrics *metrics.Scheduler
AlertSender AlertsSender AlertSender AlertsSender
@ -172,7 +176,7 @@ func (sch *schedule) DeleteAlertRule(keys ...ngmodels.AlertRuleKey) {
ruleInfo.stop(errRuleDeleted) ruleInfo.stop(errRuleDeleted)
} }
// Our best bet at this point is that we update the metrics with what we hope to schedule in the next tick. // Our best bet at this point is that we update the metrics with what we hope to schedule in the next tick.
alertRules := sch.schedulableAlertRules.all() alertRules, _ := sch.schedulableAlertRules.all()
sch.metrics.SchedulableAlertRules.Set(float64(len(alertRules))) sch.metrics.SchedulableAlertRules.Set(float64(len(alertRules)))
sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules))) sch.metrics.SchedulableAlertRulesHash.Set(float64(hashUIDs(alertRules)))
} }
@ -194,7 +198,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
if err := sch.updateSchedulableAlertRules(ctx); err != nil { if err := sch.updateSchedulableAlertRules(ctx); err != nil {
sch.log.Error("scheduler failed to update alert rules", "err", err) sch.log.Error("scheduler failed to update alert rules", "err", err)
} }
alertRules := sch.schedulableAlertRules.all() alertRules, folderTitles := sch.schedulableAlertRules.all()
// registeredDefinitions is a map used for finding deleted alert rules // registeredDefinitions is a map used for finding deleted alert rules
// initially it is assigned to all known alert rules from the previous cycle // initially it is assigned to all known alert rules from the previous cycle
@ -209,10 +213,11 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
type readyToRunItem struct { type readyToRunItem struct {
ruleInfo *alertRuleInfo ruleInfo *alertRuleInfo
rule *ngmodels.AlertRule evaluation
} }
readyToRun := make([]readyToRunItem, 0) readyToRun := make([]readyToRunItem, 0)
missingFolder := make(map[string][]string)
for _, item := range alertRules { for _, item := range alertRules {
key := item.GetKey() key := item.GetKey()
ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key) ruleInfo, newRoutine := sch.registry.getOrCreateInfo(ctx, key)
@ -240,13 +245,30 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds()) itemFrequency := item.IntervalSeconds / int64(sch.baseInterval.Seconds())
if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 { if item.IntervalSeconds != 0 && tickNum%itemFrequency == 0 {
readyToRun = append(readyToRun, readyToRunItem{ruleInfo: ruleInfo, rule: item}) var folderTitle string
if !sch.disableGrafanaFolder {
title, ok := folderTitles[item.NamespaceUID]
if ok {
folderTitle = title
} else {
missingFolder[item.NamespaceUID] = append(missingFolder[item.NamespaceUID], item.UID)
}
}
readyToRun = append(readyToRun, readyToRunItem{ruleInfo: ruleInfo, evaluation: evaluation{
scheduledAt: tick,
rule: item,
folderTitle: folderTitle,
}})
} }
// remove the alert rule from the registered alert rules // remove the alert rule from the registered alert rules
delete(registeredDefinitions, key) delete(registeredDefinitions, key)
} }
if len(missingFolder) > 0 { // if this happens then there can be problems with fetching folders from the database.
sch.log.Warn("unable to find obtain folder titles for some rules", "folder_to_rule_map", missingFolder)
}
var step int64 = 0 var step int64 = 0
if len(readyToRun) > 0 { if len(readyToRun) > 0 {
step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun)) step = sch.baseInterval.Nanoseconds() / int64(len(readyToRun))
@ -257,7 +279,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
time.AfterFunc(time.Duration(int64(i)*step), func() { time.AfterFunc(time.Duration(int64(i)*step), func() {
key := item.rule.GetKey() key := item.rule.GetKey()
success, dropped := item.ruleInfo.eval(tick, item.rule) success, dropped := item.ruleInfo.eval(&item.evaluation)
if !success { if !success {
sch.log.Debug("scheduled evaluation was canceled because evaluation routine was stopped", "uid", key.UID, "org", key.OrgID, "time", tick) sch.log.Debug("scheduled evaluation was canceled because evaluation routine was stopped", "uid", key.UID, "org", key.OrgID, "time", tick)
return return
@ -303,7 +325,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
} }
} }
evaluate := func(ctx context.Context, extraLabels map[string]string, attempt int64, e *evaluation) { evaluate := func(ctx context.Context, attempt int64, e *evaluation) {
logger := logger.New("version", e.rule.Version, "attempt", attempt, "now", e.scheduledAt) logger := logger.New("version", e.rule.Version, "attempt", attempt, "now", e.scheduledAt)
start := sch.clock.Now() start := sch.clock.Now()
@ -321,7 +343,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
logger.Debug("skip updating the state because the context has been cancelled") logger.Debug("skip updating the state because the context has been cancelled")
return return
} }
processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, e.rule, results, extraLabels) processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, e.rule, results, sch.getRuleExtraLabels(e))
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL) alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
if len(alerts.PostableAlerts) > 0 { if len(alerts.PostableAlerts) > 0 {
sch.alertsSender.Send(key, alerts) sch.alertsSender.Send(key, alerts)
@ -342,7 +364,6 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
evalRunning := false evalRunning := false
var currentRuleVersion int64 = 0 var currentRuleVersion int64 = 0
var extraLabels map[string]string
defer sch.stopApplied(key) defer sch.stopApplied(key)
for { for {
select { select {
@ -384,14 +405,9 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
logger.Debug("got a new version of alert rule. Clear up the state and refresh extra labels", "version", currentRuleVersion, "new_version", newVersion) logger.Debug("got a new version of alert rule. Clear up the state and refresh extra labels", "version", currentRuleVersion, "new_version", newVersion)
clearState() clearState()
} }
newLabels, err := sch.getRuleExtraLabels(grafanaCtx, ctx.rule)
if err != nil {
return err
}
currentRuleVersion = newVersion currentRuleVersion = newVersion
extraLabels = newLabels
} }
evaluate(grafanaCtx, extraLabels, attempt, ctx) evaluate(grafanaCtx, attempt, ctx)
return nil return nil
}) })
if err != nil { if err != nil {
@ -437,26 +453,15 @@ func (sch *schedule) stopApplied(alertDefKey ngmodels.AlertRuleKey) {
sch.stopAppliedFunc(alertDefKey) sch.stopAppliedFunc(alertDefKey)
} }
func (sch *schedule) getRuleExtraLabels(ctx context.Context, alertRule *ngmodels.AlertRule) (map[string]string, error) { func (sch *schedule) getRuleExtraLabels(evalCtx *evaluation) map[string]string {
extraLabels := make(map[string]string, 4) extraLabels := make(map[string]string, 4)
extraLabels[ngmodels.NamespaceUIDLabel] = alertRule.NamespaceUID extraLabels[ngmodels.NamespaceUIDLabel] = evalCtx.rule.NamespaceUID
extraLabels[prometheusModel.AlertNameLabel] = alertRule.Title extraLabels[prometheusModel.AlertNameLabel] = evalCtx.rule.Title
extraLabels[ngmodels.RuleUIDLabel] = alertRule.UID extraLabels[ngmodels.RuleUIDLabel] = evalCtx.rule.UID
user := &user.SignedInUser{
UserID: 0,
OrgRole: org.RoleAdmin,
OrgID: alertRule.OrgID,
}
if !sch.disableGrafanaFolder { if !sch.disableGrafanaFolder {
folder, err := sch.ruleStore.GetNamespaceByUID(ctx, alertRule.NamespaceUID, alertRule.OrgID, user) extraLabels[ngmodels.FolderTitleLabel] = evalCtx.folderTitle
if err != nil {
sch.log.Error("failed to fetch alert rule namespace", "err", err, "uid", alertRule.UID, "org", alertRule.OrgID, "namespace_uid", alertRule.NamespaceUID)
return nil, err
}
extraLabels[ngmodels.FolderTitleLabel] = folder.Title
} }
return extraLabels, nil return extraLabels
} }

View File

@ -63,7 +63,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
rule := models.AlertRuleGen(withQueryForState(t, evalState))() rule := models.AlertRuleGen(withQueryForState(t, evalState))()
ruleStore.PutRule(context.Background(), rule) ruleStore.PutRule(context.Background(), rule)
folder, _ := ruleStore.GetNamespaceByUID(context.Background(), rule.NamespaceUID, rule.OrgID, nil)
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel) t.Cleanup(cancel)
@ -75,6 +75,7 @@ func TestSchedule_ruleRoutine(t *testing.T) {
evalChan <- &evaluation{ evalChan <- &evaluation{
scheduledAt: expectedTime, scheduledAt: expectedTime,
rule: rule, rule: rule,
folderTitle: folder.Title,
} }
actualTime := waitForTimeChannel(t, evalAppliedChan) actualTime := waitForTimeChannel(t, evalAppliedChan)
@ -82,7 +83,6 @@ func TestSchedule_ruleRoutine(t *testing.T) {
t.Run("it should add extra labels", func(t *testing.T) { t.Run("it should add extra labels", func(t *testing.T) {
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID) states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
folder, _ := ruleStore.GetNamespaceByUID(context.Background(), rule.NamespaceUID, rule.OrgID, nil)
for _, s := range states { for _, s := range states {
assert.Equal(t, rule.UID, s.Labels[models.RuleUIDLabel]) assert.Equal(t, rule.UID, s.Labels[models.RuleUIDLabel])
assert.Equal(t, rule.NamespaceUID, s.Labels[models.NamespaceUIDLabel]) assert.Equal(t, rule.NamespaceUID, s.Labels[models.NamespaceUIDLabel])

View File

@ -44,7 +44,6 @@ type RuleStore interface {
DeleteAlertInstancesByRuleUID(ctx context.Context, orgID int64, ruleUID string) error DeleteAlertInstancesByRuleUID(ctx context.Context, orgID int64, ruleUID string) error
GetAlertRuleByUID(ctx context.Context, query *ngmodels.GetAlertRuleByUIDQuery) error GetAlertRuleByUID(ctx context.Context, query *ngmodels.GetAlertRuleByUIDQuery) error
GetAlertRulesGroupByRuleUID(ctx context.Context, query *ngmodels.GetAlertRulesGroupByRuleUIDQuery) error GetAlertRulesGroupByRuleUID(ctx context.Context, query *ngmodels.GetAlertRulesGroupByRuleUIDQuery) error
GetAlertRulesForScheduling(ctx context.Context, query *ngmodels.GetAlertRulesForSchedulingQuery) error
ListAlertRules(ctx context.Context, query *ngmodels.ListAlertRulesQuery) error ListAlertRules(ctx context.Context, query *ngmodels.ListAlertRulesQuery) error
// GetRuleGroups returns the unique rule groups across all organizations. // GetRuleGroups returns the unique rule groups across all organizations.
GetRuleGroups(ctx context.Context, query *ngmodels.ListRuleGroupsQuery) error GetRuleGroups(ctx context.Context, query *ngmodels.ListRuleGroupsQuery) error
@ -416,23 +415,72 @@ func (st DBstore) GetNamespaceByUID(ctx context.Context, uid string, orgID int64
return folder, nil return folder, nil
} }
// GetAlertRulesForScheduling returns a short version of all alert rules except those that belong to an excluded list of organizations func (st DBstore) getFilterByOrgsString() (string, []interface{}) {
func (st DBstore) GetAlertRulesForScheduling(ctx context.Context, query *ngmodels.GetAlertRulesForSchedulingQuery) error { if len(st.Cfg.DisabledOrgs) == 0 {
return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error { return "", nil
alerts := make([]*ngmodels.AlertRule, 0) }
q := sess.Table(ngmodels.AlertRule{}) builder := strings.Builder{}
if len(st.Cfg.DisabledOrgs) > 0 { builder.WriteString("org_id NOT IN(")
excludeOrgs := make([]interface{}, 0, len(st.Cfg.DisabledOrgs)) idx := len(st.Cfg.DisabledOrgs)
for orgID := range st.Cfg.DisabledOrgs { args := make([]interface{}, 0, len(st.Cfg.DisabledOrgs))
excludeOrgs = append(excludeOrgs, orgID) for orgId := range st.Cfg.DisabledOrgs {
} args = append(args, orgId)
q = q.NotIn("org_id", excludeOrgs...) builder.WriteString("?")
idx--
if idx == 0 {
builder.WriteString(")")
break
} }
q = q.Asc("namespace_uid", "rule_group", "rule_group_idx", "id") builder.WriteString(",")
if err := q.Find(&alerts); err != nil { }
return builder.String(), args
}
func (st DBstore) GetAlertRulesKeysForScheduling(ctx context.Context) ([]ngmodels.AlertRuleKeyWithVersion, error) {
var result []ngmodels.AlertRuleKeyWithVersion
err := st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
alertRulesSql := "SELECT org_id, uid, version FROM alert_rule"
filter, args := st.getFilterByOrgsString()
if filter != "" {
alertRulesSql += " WHERE " + filter
}
if err := sess.SQL(alertRulesSql, args...).Find(&result); err != nil {
return err return err
} }
query.Result = alerts return nil
})
return result, err
}
// GetAlertRulesForScheduling returns a short version of all alert rules except those that belong to an excluded list of organizations
func (st DBstore) GetAlertRulesForScheduling(ctx context.Context, query *ngmodels.GetAlertRulesForSchedulingQuery) error {
var folders []struct {
Uid string
Title string
}
var rules []*ngmodels.AlertRule
return st.SQLStore.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
foldersSql := "SELECT D.uid, D.title FROM dashboard AS D WHERE is_folder = 1 AND EXISTS (SELECT 1 FROM alert_rule AS A WHERE D.uid = A.namespace_uid)"
alertRulesSql := "SELECT * FROM alert_rule"
filter, args := st.getFilterByOrgsString()
if filter != "" {
foldersSql += " AND " + filter
alertRulesSql += " WHERE " + filter
}
if err := sess.SQL(alertRulesSql, args...).Find(&rules); err != nil {
return fmt.Errorf("failed to fetch alert rules: %w", err)
}
query.ResultRules = rules
if query.PopulateFolders {
if err := sess.SQL(foldersSql, args...).Find(&folders); err != nil {
return fmt.Errorf("failed to fetch a list of folders that contain alert rules: %w", err)
}
query.ResultFoldersTitles = make(map[string]string, len(folders))
for _, folder := range folders {
query.ResultFoldersTitles[folder.Uid] = folder.Title
}
}
return nil return nil
}) })
} }

View File

@ -7,6 +7,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"golang.org/x/exp/rand" "golang.org/x/exp/rand"
@ -92,3 +93,55 @@ func withIntervalMatching(baseInterval time.Duration) func(*models.AlertRule) {
rule.For = time.Duration(rule.IntervalSeconds*rand.Int63n(9)+1) * time.Second rule.For = time.Duration(rule.IntervalSeconds*rand.Int63n(9)+1) * time.Second
} }
} }
func Test_getFilterByOrgsString(t *testing.T) {
testCases := []struct {
testName string
orgs map[int64]struct{}
expectedFilter string
expectedArgs []interface{}
}{
{
testName: "should return empty string if map is empty",
orgs: map[int64]struct{}{},
expectedFilter: "",
expectedArgs: nil,
},
{
testName: "should return empty string if map is nil",
orgs: nil,
expectedFilter: "",
expectedArgs: nil,
},
{
testName: "should return correct filter if single element",
orgs: map[int64]struct{}{
1: {},
},
expectedFilter: "org_id NOT IN(?)",
expectedArgs: []interface{}{int64(1)},
},
{
testName: "should return correct filter if many elements",
orgs: map[int64]struct{}{
1: {},
2: {},
3: {},
},
expectedFilter: "org_id NOT IN(?,?,?)",
expectedArgs: []interface{}{int64(1), int64(2), int64(3)},
},
}
for _, testCase := range testCases {
t.Run(testCase.testName, func(t *testing.T) {
store := &DBstore{
Cfg: setting.UnifiedAlertingSettings{
DisabledOrgs: testCase.orgs,
},
}
filter, args := store.getFilterByOrgsString()
assert.Equal(t, testCase.expectedFilter, filter)
assert.ElementsMatch(t, testCase.expectedArgs, args)
})
}
}

View File

@ -174,6 +174,24 @@ func (f *FakeRuleStore) GetAlertRulesGroupByRuleUID(_ context.Context, q *models
} }
return nil return nil
} }
func (f *FakeRuleStore) GetAlertRulesKeysForScheduling(_ context.Context) ([]models.AlertRuleKeyWithVersion, error) {
f.mtx.Lock()
defer f.mtx.Unlock()
f.RecordedOps = append(f.RecordedOps, GenericRecordedQuery{
Name: "GetAlertRulesKeysForScheduling",
Params: []interface{}{},
})
result := make([]models.AlertRuleKeyWithVersion, 0, len(f.Rules))
for _, rules := range f.Rules {
for _, rule := range rules {
result = append(result, models.AlertRuleKeyWithVersion{
Version: rule.Version,
AlertRuleKey: rule.GetKey(),
})
}
}
return result, nil
}
// For now, we're not implementing namespace filtering. // For now, we're not implementing namespace filtering.
func (f *FakeRuleStore) GetAlertRulesForScheduling(_ context.Context, q *models.GetAlertRulesForSchedulingQuery) error { func (f *FakeRuleStore) GetAlertRulesForScheduling(_ context.Context, q *models.GetAlertRulesForSchedulingQuery) error {
@ -183,8 +201,23 @@ func (f *FakeRuleStore) GetAlertRulesForScheduling(_ context.Context, q *models.
if err := f.Hook(*q); err != nil { if err := f.Hook(*q); err != nil {
return err return err
} }
q.ResultFoldersTitles = make(map[string]string)
for _, rules := range f.Rules { for _, rules := range f.Rules {
q.Result = append(q.Result, rules...) for _, rule := range rules {
q.ResultRules = append(q.ResultRules, rule)
if !q.PopulateFolders {
continue
}
if _, ok := q.ResultFoldersTitles[rule.NamespaceUID]; !ok {
if folders, ok := f.Folders[rule.OrgID]; ok {
for _, folder := range folders {
if folder.Uid == rule.NamespaceUID {
q.ResultFoldersTitles[rule.NamespaceUID] = folder.Title
}
}
}
}
}
} }
return nil return nil
} }