Alerting: Update state manager to accept reserved labels (#52189)

* add tests for cache getOrCreate
* update ProcessEvalResults to accept extra lables
* extract to getRuleExtraLabels
* move populating of constant rule labels to extra labels
This commit is contained in:
Yuriy Tseretyan 2022-07-14 15:59:59 -04:00 committed by GitHub
parent 19cf9fa87d
commit e5e8747ee9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 288 additions and 77 deletions

View File

@ -6,6 +6,8 @@ import (
"math/rand"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/util"
)
@ -38,19 +40,11 @@ func AlertRuleGen(mutators ...AlertRuleMutator) func() *AlertRule {
var annotations map[string]string = nil
if rand.Int63()%2 == 0 {
qty := rand.Intn(5)
annotations = make(map[string]string, qty)
for i := 0; i < qty; i++ {
annotations[util.GenerateShortUID()] = util.GenerateShortUID()
}
annotations = GenerateAlertLabels(rand.Intn(5), "ann-")
}
var labels map[string]string = nil
if rand.Int63()%2 == 0 {
qty := rand.Intn(5)
labels = make(map[string]string, qty)
for i := 0; i < qty; i++ {
labels[util.GenerateShortUID()] = util.GenerateShortUID()
}
labels = GenerateAlertLabels(rand.Intn(5), "lbl-")
}
var dashUID *string = nil
@ -91,6 +85,11 @@ func AlertRuleGen(mutators ...AlertRuleMutator) func() *AlertRule {
}
}
func WithNotEmptyLabels(count int, prefix string) AlertRuleMutator {
return func(rule *AlertRule) {
rule.Labels = GenerateAlertLabels(count, prefix)
}
}
func WithUniqueID() AlertRuleMutator {
usedID := make(map[int64]struct{})
return func(rule *AlertRule) {
@ -133,6 +132,14 @@ func WithSequentialGroupIndex() AlertRuleMutator {
}
}
func GenerateAlertLabels(count int, prefix string) data.Labels {
labels := make(data.Labels, count)
for i := 0; i < count; i++ {
labels[prefix+"key-"+util.GenerateShortUID()] = prefix + "value-" + util.GenerateShortUID()
}
return labels
}
func GenerateAlertQuery() AlertQuery {
f := rand.Intn(10) + 5
t := rand.Intn(f)

View File

@ -6,6 +6,8 @@ import (
"net/url"
"time"
prometheusModel "github.com/prometheus/common/model"
"github.com/grafana/grafana/pkg/bus"
"github.com/grafana/grafana/pkg/events"
"github.com/grafana/grafana/pkg/infra/log"
@ -350,42 +352,24 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
sch.alertsSender.Send(key, expiredAlerts)
}
updateRule := func(ctx context.Context, oldRule *ngmodels.AlertRule) (*ngmodels.AlertRule, error) {
updateRule := func(ctx context.Context, oldRule *ngmodels.AlertRule) (*ngmodels.AlertRule, map[string]string, error) {
q := ngmodels.GetAlertRuleByUIDQuery{OrgID: key.OrgID, UID: key.UID}
err := sch.ruleStore.GetAlertRuleByUID(ctx, &q)
if err != nil {
logger.Error("failed to fetch alert rule", "err", err)
return nil, err
return nil, nil, err
}
if oldRule != nil && oldRule.Version < q.Result.Version {
clearState()
}
user := &models.SignedInUser{
UserId: 0,
OrgRole: models.ROLE_ADMIN,
OrgId: key.OrgID,
newLabels, err := sch.getRuleExtraLabels(ctx, q.Result)
if err != nil {
return nil, nil, err
}
if !sch.disableGrafanaFolder {
folder, err := sch.ruleStore.GetNamespaceByUID(ctx, q.Result.NamespaceUID, q.Result.OrgID, user)
if err != nil {
logger.Error("failed to fetch alert rule namespace", "err", err)
return nil, err
}
if q.Result.Labels == nil {
q.Result.Labels = make(map[string]string)
} else if val, ok := q.Result.Labels[ngmodels.FolderTitleLabel]; ok {
logger.Warn("alert rule contains protected label, value will be overwritten", "label", ngmodels.FolderTitleLabel, "value", val)
}
q.Result.Labels[ngmodels.FolderTitleLabel] = folder.Title
}
return q.Result, nil
return q.Result, newLabels, nil
}
evaluate := func(ctx context.Context, r *ngmodels.AlertRule, attempt int64, e *evaluation) {
evaluate := func(ctx context.Context, r *ngmodels.AlertRule, extraLabels map[string]string, attempt int64, e *evaluation) {
logger := logger.New("version", r.Version, "attempt", attempt, "now", e.scheduledAt)
start := sch.clock.Now()
@ -400,7 +384,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
logger.Debug("alert rule evaluated", "results", results, "duration", dur)
}
processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, r, results)
processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, r, results, extraLabels)
sch.saveAlertStates(ctx, processedStates)
alerts := FromAlertStateToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
sch.alertsSender.Send(key, alerts)
@ -420,6 +404,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
evalRunning := false
var currentRule *ngmodels.AlertRule
var extraLabels map[string]string
defer sch.stopApplied(key)
for {
select {
@ -427,12 +412,13 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
case <-updateCh:
logger.Info("fetching new version of the rule")
err := retryIfError(func(attempt int64) error {
newRule, err := updateRule(grafanaCtx, currentRule)
newRule, newExtraLabels, err := updateRule(grafanaCtx, currentRule)
if err != nil {
return err
}
logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version)
currentRule = newRule
extraLabels = newExtraLabels
return nil
})
if err != nil {
@ -458,14 +444,15 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
err := retryIfError(func(attempt int64) error {
// fetch latest alert rule version
if currentRule == nil || currentRule.Version < ctx.version {
newRule, err := updateRule(grafanaCtx, currentRule)
newRule, newExtraLabels, err := updateRule(grafanaCtx, currentRule)
if err != nil {
return err
}
currentRule = newRule
extraLabels = newExtraLabels
logger.Debug("new alert rule version fetched", "title", newRule.Title, "version", newRule.Version)
}
evaluate(grafanaCtx, currentRule, attempt, ctx)
evaluate(grafanaCtx, currentRule, extraLabels, attempt, ctx)
return nil
})
if err != nil {
@ -535,3 +522,27 @@ func (sch *schedule) stopApplied(alertDefKey ngmodels.AlertRuleKey) {
sch.stopAppliedFunc(alertDefKey)
}
func (sch *schedule) getRuleExtraLabels(ctx context.Context, alertRule *ngmodels.AlertRule) (map[string]string, error) {
extraLabels := make(map[string]string, 4)
extraLabels[ngmodels.NamespaceUIDLabel] = alertRule.NamespaceUID
extraLabels[prometheusModel.AlertNameLabel] = alertRule.Title
extraLabels[ngmodels.RuleUIDLabel] = alertRule.UID
user := &models.SignedInUser{
UserId: 0,
OrgRole: models.ROLE_ADMIN,
OrgId: alertRule.OrgID,
}
if !sch.disableGrafanaFolder {
folder, err := sch.ruleStore.GetNamespaceByUID(ctx, alertRule.NamespaceUID, alertRule.OrgID, user)
if err != nil {
sch.log.Error("failed to fetch alert rule namespace", "err", err, "uid", alertRule.UID, "org", alertRule.OrgID, "namespace_uid", alertRule.NamespaceUID)
return nil, err
}
extraLabels[ngmodels.FolderTitleLabel] = folder.Title
}
return extraLabels, nil
}

View File

@ -398,19 +398,11 @@ func TestSchedule_ruleRoutine(t *testing.T) {
require.Equal(t, rule.OrgID, queries[0].OrgID)
})
t.Run("it should get rule folder title from database and attach as label", func(t *testing.T) {
queries := make([]store.GenericRecordedQuery, 0)
for _, op := range ruleStore.RecordedOps {
switch q := op.(type) {
case store.GenericRecordedQuery:
queries = append(queries, q)
}
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
for _, s := range states {
require.NotEmptyf(t, s.Labels[models.FolderTitleLabel], "Expected a non-empty title in label %s", models.FolderTitleLabel)
require.Equal(t, s.Labels[models.FolderTitleLabel], ruleStore.Folders[rule.OrgID][0].Title)
}
require.NotEmptyf(t, queries, "Expected a %T request to rule store but nothing was recorded", store.GenericRecordedQuery{})
require.Len(t, queries, 1, "Expected exactly one request of %T but got %d", store.GenericRecordedQuery{}, len(queries))
require.Equal(t, rule.NamespaceUID, queries[0].Params[1])
require.Equal(t, rule.OrgID, queries[0].Params[0])
require.NotEmptyf(t, rule.Labels[models.FolderTitleLabel], "Expected a non-empty title in label %s", models.FolderTitleLabel)
require.Equal(t, rule.Labels[models.FolderTitleLabel], ruleStore.Folders[rule.OrgID][0].Title)
})
t.Run("it should process evaluation results via state manager", func(t *testing.T) {
// TODO rewrite when we are able to mock/fake state manager

View File

@ -13,7 +13,6 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngModels "github.com/grafana/grafana/pkg/services/ngalert/models"
prometheusModel "github.com/prometheus/common/model"
)
type cache struct {
@ -33,18 +32,42 @@ func newCache(logger log.Logger, metrics *metrics.State, externalURL *url.URL) *
}
}
func (c *cache) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result) *State {
func (c *cache) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels) *State {
c.mtxStates.Lock()
defer c.mtxStates.Unlock()
// clone the labels so we don't change eval.Result
labels := result.Instance.Copy()
attachRuleLabels(labels, alertRule)
ruleLabels, annotations := c.expandRuleLabelsAndAnnotations(ctx, alertRule, labels, result)
ruleLabels, annotations := c.expandRuleLabelsAndAnnotations(ctx, alertRule, result, extraLabels)
// if duplicate labels exist, alertRule label will take precedence
lbs := mergeLabels(ruleLabels, result.Instance)
attachRuleLabels(lbs, alertRule)
lbs := make(data.Labels, len(extraLabels)+len(ruleLabels)+len(result.Instance))
dupes := make(data.Labels)
for key, val := range extraLabels {
lbs[key] = val
}
for key, val := range ruleLabels {
_, ok := lbs[key]
// if duplicate labels exist, reserved label will take precedence
if ok {
dupes[key] = val
} else {
lbs[key] = val
}
}
if len(dupes) > 0 {
c.log.Warn("rule declares one or many reserved labels. Those rules labels will be ignored", "labels", dupes)
}
dupes = make(data.Labels)
for key, val := range result.Instance {
_, ok := lbs[key]
// if duplicate labels exist, reserved or alert rule label will take precedence
if ok {
dupes[key] = val
} else {
lbs[key] = val
}
}
if len(dupes) > 0 {
c.log.Warn("evaluation result contains either reserved labels or labels declared in the rules. Those labels from the result will be ignored", "labels", dupes)
}
il := ngModels.InstanceLabels(lbs)
id, err := il.StringKey()
@ -93,17 +116,14 @@ func (c *cache) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule,
return newState
}
func attachRuleLabels(m map[string]string, alertRule *ngModels.AlertRule) {
m[ngModels.RuleUIDLabel] = alertRule.UID
m[ngModels.NamespaceUIDLabel] = alertRule.NamespaceUID
m[prometheusModel.AlertNameLabel] = alertRule.Title
}
func (c *cache) expandRuleLabelsAndAnnotations(ctx context.Context, alertRule *ngModels.AlertRule, alertInstance eval.Result, extraLabels data.Labels) (data.Labels, data.Labels) {
// use labels from the result and extra labels to expand the labels and annotations declared by the rule
templateLabels := mergeLabels(extraLabels, alertInstance.Instance)
func (c *cache) expandRuleLabelsAndAnnotations(ctx context.Context, alertRule *ngModels.AlertRule, labels map[string]string, alertInstance eval.Result) (map[string]string, map[string]string) {
expand := func(original map[string]string) map[string]string {
expanded := make(map[string]string, len(original))
for k, v := range original {
ev, err := expandTemplate(ctx, alertRule.Title, v, labels, alertInstance, c.externalURL)
ev, err := expandTemplate(ctx, alertRule.Title, v, templateLabels, alertInstance, c.externalURL)
expanded[k] = ev
if err != nil {
c.log.Error("error in expanding template", "name", k, "value", v, "err", err.Error())
@ -204,7 +224,7 @@ func (c *cache) recordMetrics() {
// if duplicate labels exist, keep the value from the first set
func mergeLabels(a, b data.Labels) data.Labels {
newLbs := data.Labels{}
newLbs := make(data.Labels, len(a)+len(b))
for k, v := range a {
newLbs[k] = v
}

View File

@ -0,0 +1,169 @@
package state
import (
"context"
"fmt"
"net/url"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/util"
)
func Test_getOrCreate(t *testing.T) {
c := newCache(log.New("test"), &metrics.State{}, &url.URL{
Scheme: "http",
Host: "localhost:3000",
Path: "/test",
})
generateRule := models.AlertRuleGen(models.WithNotEmptyLabels(5, "rule-"))
t.Run("should combine all labels", func(t *testing.T) {
rule := generateRule()
extraLabels := models.GenerateAlertLabels(5, "extra-")
result := eval.Result{
Instance: models.GenerateAlertLabels(5, "result-"),
}
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
for key, expected := range extraLabels {
require.Equal(t, expected, state.Labels[key])
}
assert.Len(t, state.Labels, len(extraLabels)+len(rule.Labels)+len(result.Instance))
for key, expected := range extraLabels {
assert.Equal(t, expected, state.Labels[key])
}
for key, expected := range rule.Labels {
assert.Equal(t, expected, state.Labels[key])
}
for key, expected := range result.Instance {
assert.Equal(t, expected, state.Labels[key])
}
})
t.Run("extra labels should take precedence over rule and result labels", func(t *testing.T) {
rule := generateRule()
extraLabels := models.GenerateAlertLabels(2, "extra-")
result := eval.Result{
Instance: models.GenerateAlertLabels(5, "result-"),
}
for key := range extraLabels {
rule.Labels[key] = "rule-" + util.GenerateShortUID()
result.Instance[key] = "result-" + util.GenerateShortUID()
}
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
for key, expected := range extraLabels {
require.Equal(t, expected, state.Labels[key])
}
})
t.Run("rule labels should take precedence over result labels", func(t *testing.T) {
rule := generateRule()
extraLabels := models.GenerateAlertLabels(2, "extra-")
result := eval.Result{
Instance: models.GenerateAlertLabels(5, "result-"),
}
for key := range rule.Labels {
result.Instance[key] = "result-" + util.GenerateShortUID()
}
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
for key, expected := range rule.Labels {
require.Equal(t, expected, state.Labels[key])
}
})
t.Run("rule labels should be able to be expanded with result and extra labels", func(t *testing.T) {
result := eval.Result{
Instance: models.GenerateAlertLabels(5, "result-"),
}
rule := generateRule()
extraLabels := models.GenerateAlertLabels(2, "extra-")
labelTemplates := make(data.Labels)
for key := range extraLabels {
labelTemplates["rule-"+key] = fmt.Sprintf("{{ with (index .Labels \"%s\") }}{{.}}{{end}}", key)
}
for key := range result.Instance {
labelTemplates["rule-"+key] = fmt.Sprintf("{{ with (index .Labels \"%s\") }}{{.}}{{end}}", key)
}
rule.Labels = labelTemplates
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
for key, expected := range extraLabels {
assert.Equal(t, expected, state.Labels["rule-"+key])
}
for key, expected := range result.Instance {
assert.Equal(t, expected, state.Labels["rule-"+key])
}
})
t.Run("rule annotations should be able to be expanded with result and extra labels", func(t *testing.T) {
result := eval.Result{
Instance: models.GenerateAlertLabels(5, "result-"),
}
rule := generateRule()
extraLabels := models.GenerateAlertLabels(2, "extra-")
annotationTemplates := make(data.Labels)
for key := range extraLabels {
annotationTemplates["rule-"+key] = fmt.Sprintf("{{ with (index .Labels \"%s\") }}{{.}}{{end}}", key)
}
for key := range result.Instance {
annotationTemplates["rule-"+key] = fmt.Sprintf("{{ with (index .Labels \"%s\") }}{{.}}{{end}}", key)
}
rule.Annotations = annotationTemplates
state := c.getOrCreate(context.Background(), rule, result, extraLabels)
for key, expected := range extraLabels {
assert.Equal(t, expected, state.Annotations["rule-"+key])
}
for key, expected := range result.Instance {
assert.Equal(t, expected, state.Annotations["rule-"+key])
}
})
}
func Test_mergeLabels(t *testing.T) {
t.Run("merges two maps", func(t *testing.T) {
a := models.GenerateAlertLabels(5, "set1-")
b := models.GenerateAlertLabels(5, "set2-")
result := mergeLabels(a, b)
require.Len(t, result, len(a)+len(b))
for key, val := range a {
require.Equal(t, val, result[key])
}
for key, val := range b {
require.Equal(t, val, result[key])
}
})
t.Run("first set take precedence if conflict", func(t *testing.T) {
a := models.GenerateAlertLabels(5, "set1-")
b := models.GenerateAlertLabels(5, "set2-")
c := b.Copy()
for key, val := range a {
c[key] = "set2-" + val
}
result := mergeLabels(a, c)
require.Len(t, result, len(a)+len(b))
for key, val := range a {
require.Equal(t, val, result[key])
}
for key, val := range b {
require.Equal(t, val, result[key])
}
})
}

View File

@ -136,8 +136,8 @@ func (st *Manager) Warm(ctx context.Context) {
}
}
func (st *Manager) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result) *State {
return st.cache.getOrCreate(ctx, alertRule, result)
func (st *Manager) getOrCreate(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels) *State {
return st.cache.getOrCreate(ctx, alertRule, result, extraLabels)
}
func (st *Manager) set(entry *State) {
@ -158,12 +158,14 @@ func (st *Manager) RemoveByRuleUID(orgID int64, ruleUID string) {
st.cache.removeByRuleUID(orgID, ruleUID)
}
func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results) []*State {
// ProcessEvalResults updates the current states that belong to a rule with the evaluation results.
// if extraLabels is not empty, those labels will be added to every state. The extraLabels take precedence over rule labels and result labels
func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []*State {
st.log.Debug("state manager processing evaluation results", "uid", alertRule.UID, "resultCount", len(results))
var states []*State
processedResults := make(map[string]*State, len(results))
for _, result := range results {
s := st.setNextState(ctx, alertRule, result)
s := st.setNextState(ctx, alertRule, result, extraLabels)
states = append(states, s)
processedResults[s.CacheId] = s
}
@ -203,8 +205,8 @@ func (st *Manager) maybeTakeScreenshot(
}
// Set the current state based on evaluation results
func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result) *State {
currentState := st.getOrCreate(ctx, alertRule, result)
func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels) *State {
currentState := st.getOrCreate(ctx, alertRule, result, extraLabels)
currentState.LastEvaluationTime = result.EvaluatedAt
currentState.EvaluationDuration = result.EvaluationDuration

View File

@ -53,7 +53,9 @@ func TestDashboardAnnotations(t *testing.T) {
Instance: data.Labels{"instance_label": "testValue2"},
State: eval.Alerting,
EvaluatedAt: evaluationTime,
}})
}}, data.Labels{
"alertname": rule.Title,
})
expected := []string{rule.Title + " {alertname=" + rule.Title + ", instance_label=testValue2, test1=testValue1, test2=testValue2} - Alerting"}
sort.Strings(expected)
@ -1984,7 +1986,11 @@ func TestProcessEvalResults(t *testing.T) {
annotations.SetRepository(fakeAnnoRepo)
for _, res := range tc.evalResults {
_ = st.ProcessEvalResults(context.Background(), evaluationTime, tc.alertRule, res)
_ = st.ProcessEvalResults(context.Background(), evaluationTime, tc.alertRule, res, data.Labels{
"alertname": tc.alertRule.Title,
"__alert_rule_namespace_uid__": tc.alertRule.NamespaceUID,
"__alert_rule_uid__": tc.alertRule.UID,
})
}
states := st.GetStatesForRuleUID(tc.alertRule.OrgID, tc.alertRule.UID)
@ -2109,7 +2115,11 @@ func TestStaleResultsHandler(t *testing.T) {
evalTime = re.EvaluatedAt
}
}
st.ProcessEvalResults(context.Background(), evalTime, rule, res)
st.ProcessEvalResults(context.Background(), evalTime, rule, res, data.Labels{
"alertname": rule.Title,
"__alert_rule_namespace_uid__": rule.NamespaceUID,
"__alert_rule_uid__": rule.UID,
})
for _, s := range tc.expectedStates {
cachedState, err := st.Get(s.OrgID, s.AlertRuleUID, s.CacheId)
require.NoError(t, err)