Alerting: Move rule evaluation status logic out of prometheus API and into scheduler (#89141)

* Add health fields to rules and an aggregator method to the scheduler

* Move health, last error, and last eval time in together to minimize state processing

* Wire up a readonly scheduler to prom api

* Extract to exported function

* Use health in api_prometheus and fix up tests

* Rename health struct to status

* Fix tests one more time

* Several new tests

* Handle inactive rules

* Push state mapping into state manager

* rename to StatusReader

* Rectify cyclo complexity rebase

* Convert existing package local status implementation to models one

* fix tests

* undo RuleDefs rename
This commit is contained in:
Alexander Weaver 2024-09-30 16:52:49 -05:00 committed by GitHub
parent 6a3eb276ef
commit 393faa8732
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 213 additions and 23 deletions

View File

@ -63,6 +63,7 @@ type API struct {
DataProxy *datasourceproxy.DataSourceProxyService
MultiOrgAlertmanager *notifier.MultiOrgAlertmanager
StateManager *state.Manager
Scheduler StatusReader
AccessControl ac.AccessControl
Policies *provisioning.NotificationPolicyService
ReceiverService *notifier.ReceiverService
@ -115,7 +116,7 @@ func (api *API) RegisterAPIEndpoints(m *metrics.API) {
api.RegisterPrometheusApiEndpoints(NewForkingProm(
api.DatasourceCache,
NewLotexProm(proxy, logger),
&PrometheusSrv{log: logger, manager: api.StateManager, store: api.RuleStore, authz: ruleAuthzService},
&PrometheusSrv{log: logger, manager: api.StateManager, status: api.Scheduler, store: api.RuleStore, authz: ruleAuthzService},
), m)
// Register endpoints for proxying to Cortex Ruler-compatible backends.
api.RegisterRulerApiEndpoints(NewForkingRuler(

View File

@ -9,7 +9,6 @@ import (
"sort"
"strconv"
"strings"
"time"
"github.com/prometheus/alertmanager/pkg/labels"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
@ -24,9 +23,14 @@ import (
"github.com/grafana/grafana/pkg/util"
)
type StatusReader interface {
Status(key ngmodels.AlertRuleKey) (ngmodels.RuleStatus, bool)
}
type PrometheusSrv struct {
log log.Logger
manager state.AlertInstanceManager
status StatusReader
store RuleStore
authz RuleAccessControlService
}
@ -222,7 +226,7 @@ func (srv PrometheusSrv) RouteGetRuleStatuses(c *contextmodel.ReqContext) respon
namespaces[namespaceUID] = folder.Fullpath
}
ruleResponse = PrepareRuleGroupStatuses(srv.log, srv.manager, srv.store, RuleGroupStatusesOptions{
ruleResponse = PrepareRuleGroupStatuses(srv.log, srv.manager, srv.status, srv.store, RuleGroupStatusesOptions{
Ctx: c.Req.Context(),
OrgID: c.OrgID,
Query: c.Req.Form,
@ -235,7 +239,7 @@ func (srv PrometheusSrv) RouteGetRuleStatuses(c *contextmodel.ReqContext) respon
return response.JSON(ruleResponse.HTTPStatusCode(), ruleResponse)
}
func PrepareRuleGroupStatuses(log log.Logger, manager state.AlertInstanceManager, store ListAlertRulesStore, opts RuleGroupStatusesOptions) apimodels.RuleResponse {
func PrepareRuleGroupStatuses(log log.Logger, manager state.AlertInstanceManager, status StatusReader, store ListAlertRulesStore, opts RuleGroupStatusesOptions) apimodels.RuleResponse {
ruleResponse := apimodels.RuleResponse{
DiscoveryBase: apimodels.DiscoveryBase{
Status: "success",
@ -346,7 +350,7 @@ func PrepareRuleGroupStatuses(log log.Logger, manager state.AlertInstanceManager
continue
}
ruleGroup, totals := toRuleGroup(log, manager, groupKey, folder, rules, limitAlertsPerRule, withStatesFast, matchers, labelOptions)
ruleGroup, totals := toRuleGroup(log, manager, status, groupKey, folder, rules, limitAlertsPerRule, withStatesFast, matchers, labelOptions)
ruleGroup.Totals = totals
for k, v := range totals {
rulesTotals[k] += v
@ -432,7 +436,7 @@ func matchersMatch(matchers []*labels.Matcher, labels map[string]string) bool {
return true
}
func toRuleGroup(log log.Logger, manager state.AlertInstanceManager, groupKey ngmodels.AlertRuleGroupKey, folderFullPath string, rules []*ngmodels.AlertRule, limitAlerts int64, withStates map[eval.State]struct{}, matchers labels.Matchers, labelOptions []ngmodels.LabelOption) (*apimodels.RuleGroup, map[string]int64) {
func toRuleGroup(log log.Logger, manager state.AlertInstanceManager, sr StatusReader, groupKey ngmodels.AlertRuleGroupKey, folderFullPath string, rules []*ngmodels.AlertRule, limitAlerts int64, withStates map[eval.State]struct{}, matchers labels.Matchers, labelOptions []ngmodels.LabelOption) (*apimodels.RuleGroup, map[string]int64) {
newGroup := &apimodels.RuleGroup{
Name: groupKey.RuleGroup,
// file is what Prometheus uses for provisioning, we replace it with namespace which is the folder in Grafana.
@ -443,6 +447,15 @@ func toRuleGroup(log log.Logger, manager state.AlertInstanceManager, groupKey ng
ngmodels.RulesGroup(rules).SortByGroupIndex()
for _, rule := range rules {
status, ok := sr.Status(rule.GetKey())
// Grafana by design return "ok" health and default other fields for unscheduled rules.
// This differs from Prometheus.
if !ok {
status = ngmodels.RuleStatus{
Health: "ok",
}
}
alertingRule := apimodels.AlertingRule{
State: "inactive",
Name: rule.Title,
@ -454,9 +467,11 @@ func toRuleGroup(log log.Logger, manager state.AlertInstanceManager, groupKey ng
newRule := apimodels.Rule{
Name: rule.Title,
Labels: apimodels.LabelsFromMap(rule.GetLabels(labelOptions...)),
Health: "ok",
Health: status.Health,
LastError: errorOrEmpty(status.LastError),
Type: rule.Type().String(),
LastEvaluation: time.Time{},
LastEvaluation: status.EvaluationTimestamp,
EvaluationTime: status.EvaluationDuration.Seconds(),
}
states := manager.GetStatesForRuleUID(rule.OrgID, rule.UID)
@ -485,12 +500,6 @@ func toRuleGroup(log log.Logger, manager state.AlertInstanceManager, groupKey ng
Value: valString,
}
if alertState.LastEvaluationTime.After(newRule.LastEvaluation) {
newRule.LastEvaluation = alertState.LastEvaluationTime
}
newRule.EvaluationTime = alertState.EvaluationDuration.Seconds()
switch alertState.State {
case eval.Normal:
case eval.Pending:
@ -503,14 +512,7 @@ func toRuleGroup(log log.Logger, manager state.AlertInstanceManager, groupKey ng
}
alertingRule.State = "firing"
case eval.Error:
newRule.Health = "error"
case eval.NoData:
newRule.Health = "nodata"
}
if alertState.Error != nil {
newRule.LastError = alertState.Error.Error()
newRule.Health = "error"
}
if len(withStates) > 0 {
@ -604,3 +606,10 @@ func encodedQueriesOrError(rules []ngmodels.AlertQuery) string {
return err.Error()
}
func errorOrEmpty(err error) string {
if err != nil {
return err.Error()
}
return ""
}

View File

@ -489,6 +489,7 @@ func TestRouteGetRuleStatuses(t *testing.T) {
t.Run("should return sorted", func(t *testing.T) {
ruleStore := fakes.NewRuleStore(t)
fakeAIM := NewFakeAlertInstanceManager(t)
fakeSch := newFakeSchedulerReader(t).setupStates(fakeAIM)
groupKey := ngmodels.GenerateGroupKey(orgID)
gen := ngmodels.RuleGen
rules := gen.With(gen.WithGroupKey(groupKey), gen.WithUniqueGroupIndex()).GenerateManyRef(5, 10)
@ -497,6 +498,7 @@ func TestRouteGetRuleStatuses(t *testing.T) {
api := PrometheusSrv{
log: log.NewNopLogger(),
manager: fakeAIM,
status: fakeSch,
store: ruleStore,
authz: &fakeRuleAccessControlService{},
}
@ -558,6 +560,7 @@ func TestRouteGetRuleStatuses(t *testing.T) {
api := PrometheusSrv{
log: log.NewNopLogger(),
manager: fakeAIM,
status: newFakeSchedulerReader(t).setupStates(fakeAIM),
store: ruleStore,
authz: accesscontrol.NewRuleService(acimpl.ProvideAccessControl(featuremgmt.WithFeatures(), zanzana.NewNoopClient())),
}
@ -673,6 +676,7 @@ func TestRouteGetRuleStatuses(t *testing.T) {
api := PrometheusSrv{
log: log.NewNopLogger(),
manager: fakeAIM,
status: newFakeSchedulerReader(t).setupStates(fakeAIM),
store: ruleStore,
authz: accesscontrol.NewRuleService(acimpl.ProvideAccessControl(featuremgmt.WithFeatures(), zanzana.NewNoopClient())),
}
@ -1389,11 +1393,13 @@ func TestRouteGetRuleStatuses(t *testing.T) {
func setupAPI(t *testing.T) (*fakes.RuleStore, *fakeAlertInstanceManager, PrometheusSrv) {
fakeStore := fakes.NewRuleStore(t)
fakeAIM := NewFakeAlertInstanceManager(t)
fakeSch := newFakeSchedulerReader(t).setupStates(fakeAIM)
fakeAuthz := &fakeRuleAccessControlService{}
api := PrometheusSrv{
log: log.NewNopLogger(),
manager: fakeAIM,
status: fakeSch,
store: fakeStore,
authz: fakeAuthz,
}

View File

@ -166,3 +166,29 @@ func (f fakeRuleAccessControlService) AuthorizeDatasourceAccessForRule(ctx conte
func (f fakeRuleAccessControlService) AuthorizeDatasourceAccessForRuleGroup(ctx context.Context, user identity.Requester, rules models.RulesGroup) error {
return nil
}
type statesReader interface {
GetStatesForRuleUID(orgID int64, alertRuleUID string) []*state.State
}
type fakeSchedulerReader struct {
states statesReader
}
func newFakeSchedulerReader(t *testing.T) *fakeSchedulerReader {
return &fakeSchedulerReader{}
}
// setupStates allows the fake scheduler to return data consistent with states defined elsewhere.
// This can be combined with fakeAlertInstanceManager, for instance.
func (f *fakeSchedulerReader) setupStates(reader statesReader) *fakeSchedulerReader {
f.states = reader
return f
}
func (f *fakeSchedulerReader) Status(key models.AlertRuleKey) (models.RuleStatus, bool) {
if f.states == nil {
return models.RuleStatus{}, false
}
return state.StatesToRuleStatus(f.states.GetStatesForRuleUID(key.OrgID, key.UID)), true
}

View File

@ -886,3 +886,11 @@ func (r *Record) Fingerprint() data.Fingerprint {
func hasAnyCondition(rule *AlertRuleWithOptionals) bool {
return rule.Condition != "" || (rule.Record != nil && rule.Record.From != "")
}
// RuleStatus contains info about a rule's current evaluation state.
type RuleStatus struct {
Health string
LastError error
EvaluationTimestamp time.Time
EvaluationDuration time.Duration
}

View File

@ -478,6 +478,7 @@ func (ng *AlertNG) init() error {
ProvenanceStore: ng.store,
MultiOrgAlertmanager: ng.MultiOrgAlertmanager,
StateManager: ng.stateManager,
Scheduler: scheduler,
AccessControl: ng.accesscontrol,
Policies: policyService,
ReceiverService: receiverService,

View File

@ -40,6 +40,8 @@ type Rule interface {
Update(lastVersion RuleVersionAndPauseStatus) bool
// Type gives the type of the rule.
Type() ngmodels.RuleType
// Status indicates the status of the evaluating rule.
Status() ngmodels.RuleStatus
}
type ruleFactoryFunc func(context.Context, *ngmodels.AlertRule) Rule
@ -180,6 +182,10 @@ func (a *alertRule) Type() ngmodels.RuleType {
return ngmodels.RuleTypeAlerting
}
func (a *alertRule) Status() ngmodels.RuleStatus {
return a.stateManager.GetStatusForRuleUID(a.key.OrgID, a.key.UID)
}
// eval signals the rule evaluation routine to perform the evaluation of the rule. Does nothing if the loop is stopped.
// Before sending a message into the channel, it does non-blocking read to make sure that there is no concurrent send operation.
// Returns a tuple where first element is

View File

@ -369,6 +369,17 @@ func TestRuleRoutine(t *testing.T) {
require.Equal(t, s.Labels, data.Labels(cmd.Labels))
})
t.Run("status should accurately reflect latest evaluation", func(t *testing.T) {
states := sch.stateManager.GetStatesForRuleUID(rule.OrgID, rule.UID)
require.NotEmpty(t, states)
status := ruleInfo.Status()
require.Equal(t, "ok", status.Health)
require.Nil(t, status.LastError)
require.Equal(t, states[0].LastEvaluationTime, status.EvaluationTimestamp)
require.Equal(t, states[0].EvaluationDuration, status.EvaluationDuration)
})
t.Run("it reports metrics", func(t *testing.T) {
// duration metric has 0 values because of mocked clock that do not advance
expectedMetric := fmt.Sprintf(
@ -700,6 +711,15 @@ func TestRuleRoutine(t *testing.T) {
assert.Len(t, args.PostableAlerts, 1)
assert.Equal(t, state.ErrorAlertName, args.PostableAlerts[0].Labels[prometheusModel.AlertNameLabel])
})
t.Run("status should reflect unhealthy rule", func(t *testing.T) {
status := ruleInfo.Status()
require.Equal(t, "error", status.Health)
require.NotNil(t, status.LastError, "expected status to carry the latest evaluation error")
require.Contains(t, status.LastError.Error(), "cannot reference itself")
require.Equal(t, int64(0), status.EvaluationTimestamp.UTC().Unix())
require.Equal(t, time.Duration(0), status.EvaluationDuration)
})
})
t.Run("when there are alerts that should be firing", func(t *testing.T) {

View File

@ -84,8 +84,8 @@ func (r *recordingRule) Type() ngmodels.RuleType {
return ngmodels.RuleTypeRecording
}
func (r *recordingRule) Status() RuleStatus {
return RuleStatus{
func (r *recordingRule) Status() ngmodels.RuleStatus {
return ngmodels.RuleStatus{
Health: r.health.Load(),
LastError: r.lastError.Load(),
EvaluationTimestamp: r.evaluationTimestamp.Load(),

View File

@ -56,6 +56,14 @@ func (r *ruleRegistry) exists(key models.AlertRuleKey) bool {
return ok
}
// get fetches a rule from the registry by key. It returns (rule, ok) where ok is false if the rule did not exist.
func (r *ruleRegistry) get(key models.AlertRuleKey) (Rule, bool) {
r.mu.Lock()
defer r.mu.Unlock()
ru, ok := r.rules[key]
return ru, ok
}
// del removes pair that has specific key from the registry.
// Returns 2-tuple where the first element is value of the removed pair
// and the second element indicates whether element with the specified key existed.

View File

@ -171,6 +171,14 @@ func (sch *schedule) Rules() ([]*ngmodels.AlertRule, map[ngmodels.FolderKey]stri
return sch.schedulableAlertRules.all()
}
// Status fetches the health of a given scheduled rule, by key.
func (sch *schedule) Status(key ngmodels.AlertRuleKey) (ngmodels.RuleStatus, bool) {
if rule, ok := sch.registry.get(key); ok {
return rule.Status(), true
}
return ngmodels.RuleStatus{}, false
}
// deleteAlertRule stops evaluation of the rule, deletes it from active rules, and cleans up state cache.
func (sch *schedule) deleteAlertRule(keys ...ngmodels.AlertRuleKey) {
for _, key := range keys {

View File

@ -113,6 +113,11 @@ func TestProcessTicks(t *testing.T) {
folderWithRuleGroup1 := fmt.Sprintf("%s;%s", ruleStore.getNamespaceTitle(alertRule1.NamespaceUID), alertRule1.RuleGroup)
t.Run("before 1st tick status should not be available", func(t *testing.T) {
_, ok := sched.Status(alertRule1.GetKey())
require.False(t, ok, "status for a rule should not be present before the scheduler has created it")
})
t.Run("on 1st tick alert rule should be evaluated", func(t *testing.T) {
tick = tick.Add(cfg.BaseInterval)
@ -137,12 +142,25 @@ func TestProcessTicks(t *testing.T) {
require.NoError(t, err)
})
t.Run("after 1st tick status for rule should be available", func(t *testing.T) {
_, ok := sched.Status(alertRule1.GetKey())
require.True(t, ok, "status for a rule that just evaluated was not available")
// Interestingly, the rules in this test are randomised, and are sometimes invalid.
// Therefore, we can't reliably assert anything about the actual health. It might be error, it might not, depending on randomness.
// We are only testing that things were scheduled, not that the rule routine worked internally.
})
// add alert rule under main org with three base intervals
alertRule2 := gen.With(gen.WithOrgID(mainOrgID), gen.WithInterval(3*cfg.BaseInterval), gen.WithTitle("rule-2")).GenerateRef()
ruleStore.PutRule(ctx, alertRule2)
folderWithRuleGroup2 := fmt.Sprintf("%s;%s", ruleStore.getNamespaceTitle(alertRule2.NamespaceUID), alertRule2.RuleGroup)
t.Run("before 2nd tick status for rule should not be available", func(t *testing.T) {
_, ok := sched.Status(alertRule2.GetKey())
require.False(t, ok, "status for a rule should not be present before the scheduler has created it")
})
t.Run("on 2nd tick first alert rule should be evaluated", func(t *testing.T) {
tick = tick.Add(cfg.BaseInterval)
scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick)
@ -184,6 +202,16 @@ func TestProcessTicks(t *testing.T) {
assertEvalRun(t, evalAppliedCh, tick, keys...)
})
t.Run("after 3rd tick status for both rules should be available", func(t *testing.T) {
_, ok := sched.Status(alertRule1.GetKey())
require.True(t, ok, "status for a rule that just evaluated was not available")
_, ok = sched.Status(alertRule2.GetKey())
require.True(t, ok, "status for a rule that just evaluated was not available")
// Interestingly, the rules in this test are randomised, and are sometimes invalid.
// Therefore, we can't reliably assert anything about the actual health. It might be error, it might not, depending on randomness.
// We are only testing that things were scheduled, not that the rule routine worked internally.
})
t.Run("on 4th tick only one alert rule should be evaluated", func(t *testing.T) {
tick = tick.Add(cfg.BaseInterval)
scheduled, stopped, updated := sched.processTick(ctx, dispatcherGroup, tick)
@ -223,6 +251,16 @@ func TestProcessTicks(t *testing.T) {
require.NoError(t, err)
})
t.Run("after 5th tick status for both rules should be available regardless of pause state", func(t *testing.T) {
_, ok := sched.Status(alertRule1.GetKey())
require.True(t, ok, "status for a rule that just evaluated was not available")
_, ok = sched.Status(alertRule2.GetKey())
require.True(t, ok, "status for a rule that just evaluated was not available")
// Interestingly, the rules in this test are randomised, and are sometimes invalid.
// Therefore, we can't reliably assert anything about the actual health. It might be error, it might not, depending on randomness.
// We are only testing that things were scheduled, not that the rule routine worked internally.
})
t.Run("on 6th tick all alert rule are paused (it still enters evaluation but it is early skipped)", func(t *testing.T) {
tick = tick.Add(cfg.BaseInterval)
@ -309,6 +347,13 @@ func TestProcessTicks(t *testing.T) {
require.NoError(t, err)
})
t.Run("after 8th tick status for deleted rule should not be available", func(t *testing.T) {
_, ok := sched.Status(alertRule1.GetKey())
require.False(t, ok, "status for a rule that was deleted should not be available")
_, ok = sched.Status(alertRule2.GetKey())
require.True(t, ok, "status for a rule that just evaluated was not available")
})
t.Run("on 9th tick one alert rule should be evaluated", func(t *testing.T) {
tick = tick.Add(cfg.BaseInterval)
@ -338,6 +383,14 @@ func TestProcessTicks(t *testing.T) {
require.Emptyf(t, updated, "None rules are expected to be updated")
assertEvalRun(t, evalAppliedCh, tick, alertRule3.GetKey())
})
t.Run("after 10th tick status for remaining rules should be available", func(t *testing.T) {
_, ok := sched.Status(alertRule1.GetKey())
require.False(t, ok, "status for a rule that was deleted should not be available")
_, ok = sched.Status(alertRule2.GetKey())
require.True(t, ok, "status for a rule that just evaluated was not available")
_, ok = sched.Status(alertRule3.GetKey())
require.True(t, ok, "status for a rule that just evaluated was not available")
})
t.Run("on 11th tick rule2 should be updated", func(t *testing.T) {
newRule2 := models.CopyRule(alertRule2)
newRule2.Version++
@ -465,6 +518,14 @@ func TestProcessTicks(t *testing.T) {
require.Emptyf(t, updated, "No rules should be updated")
})
t.Run("after 12th tick no status should be available", func(t *testing.T) {
_, ok := sched.Status(alertRule1.GetKey())
require.False(t, ok, "status for a rule that was deleted should not be available")
_, ok = sched.Status(alertRule2.GetKey())
require.False(t, ok, "status for a rule that just evaluated was not available")
_, ok = sched.Status(alertRule3.GetKey())
require.False(t, ok, "status for a rule that just evaluated was not available")
})
t.Run("scheduled rules should be sorted", func(t *testing.T) {
rules := gen.With(gen.WithOrgID(mainOrgID), gen.WithInterval(cfg.BaseInterval)).GenerateManyRef(10, 20)

View File

@ -556,6 +556,11 @@ func (st *Manager) GetStatesForRuleUID(orgID int64, alertRuleUID string) []*Stat
return st.cache.getStatesForRuleUID(orgID, alertRuleUID, st.doNotSaveNormalState)
}
func (st *Manager) GetStatusForRuleUID(orgID int64, alertRuleUID string) ngModels.RuleStatus {
states := st.GetStatesForRuleUID(orgID, alertRuleUID)
return StatesToRuleStatus(states)
}
func (st *Manager) Put(states []*State) {
for _, s := range states {
st.cache.set(s)
@ -623,3 +628,34 @@ func (st *Manager) deleteStaleStatesFromCache(ctx context.Context, logger log.Lo
func stateIsStale(evaluatedAt time.Time, lastEval time.Time, intervalSeconds int64) bool {
return !lastEval.Add(2 * time.Duration(intervalSeconds) * time.Second).After(evaluatedAt)
}
func StatesToRuleStatus(states []*State) ngModels.RuleStatus {
status := ngModels.RuleStatus{
Health: "ok",
LastError: nil,
EvaluationTimestamp: time.Time{},
}
for _, state := range states {
if state.LastEvaluationTime.After(status.EvaluationTimestamp) {
status.EvaluationTimestamp = state.LastEvaluationTime
}
status.EvaluationDuration = state.EvaluationDuration
switch state.State {
case eval.Normal:
case eval.Pending:
case eval.Alerting:
case eval.Error:
status.Health = "error"
case eval.NoData:
status.Health = "nodata"
}
if state.Error != nil {
status.LastError = state.Error
status.Health = "error"
}
}
return status
}