Alerting/Annotations: Add annotation backend for Loki alert state history (#78156)

* Move scope type vars to testutil package

* Expose parts of state historian for use in annotation backend

* Implement Loki ASH Annotation store

This store will only implement the `Get` method of a RepositoryImpl since alert state history
writes to Loki elsewhere.

* Use interface for Loki HTTP Client

* Add tests for Loki ASH Annotation store

* Add missing test

* Fix lint

* Organize tests

* Add filter tests

* Improve tests

* Move filter logic into outer function

* Fix lint

* Add comment

* Fix tests

* Fix lint

* Rename historian store + refactor

* Cleanup historian store

* Fix tests

* Minor cleanup

* Use new `ShouldRecordAnnotation` filter

* Fix logic and add tests for this check

* Fix typos, remove unused variables, `< 1` -> `== 0`

* More closely mimic RBAC filter from xorm to ensure correct logic

* Move off weaveworks client

* Address PR comments
This commit is contained in:
William Wernert 2024-01-10 18:42:35 -05:00 committed by GitHub
parent 2c09f969f1
commit 48b5ac779b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 1025 additions and 50 deletions

View File

@ -2,21 +2,43 @@ package loki
import (
"context"
"encoding/json"
"errors"
"fmt"
"sort"
"time"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/services/annotations/accesscontrol"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert"
"golang.org/x/exp/constraints"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
ngmetrics "github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/state/historian"
"github.com/grafana/grafana/pkg/setting"
historymodel "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util/errutil"
)
const (
subsystem = "annotations"
subsystem = "annotations"
defaultQueryRange = 6 * time.Hour // from grafana/pkg/services/ngalert/state/historian/loki.go
)
var (
ErrLokiStoreInternal = errutil.Internal("annotations.loki.internal")
ErrLokiStoreNotFound = errutil.NotFound("annotations.loki.notFound")
errMissingRule = errors.New("rule not found")
)
type lokiQueryClient interface {
@ -48,13 +70,210 @@ func NewLokiHistorianStore(cfg setting.UnifiedAlertingStateHistorySettings, ft f
}
func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
return []*annotations.ItemDTO{}, nil
rule := &ngmodels.AlertRule{}
if query.AlertID != 0 {
var err error
rule, err = getRule(ctx, r.db, query.OrgID, query.AlertID)
if err != nil {
if errors.Is(err, errMissingRule) {
return make([]*annotations.ItemDTO, 0), ErrLokiStoreNotFound.Errorf("rule with ID %d does not exist", query.AlertID)
}
return make([]*annotations.ItemDTO, 0), ErrLokiStoreInternal.Errorf("failed to query rule: %w", err)
}
}
logQL, err := historian.BuildLogQuery(buildHistoryQuery(query, accessResources.Dashboards, rule.UID))
if err != nil {
return make([]*annotations.ItemDTO, 0), ErrLokiStoreInternal.Errorf("failed to build loki query: %w", err)
}
now := time.Now().UTC()
if query.To == 0 {
query.To = now.UnixMilli()
}
if query.From == 0 {
query.From = now.Add(-defaultQueryRange).UnixMilli()
}
// query.From and query.To are always in milliseconds, convert them to nanoseconds for loki
from := query.From * 1e6
to := query.To * 1e6
res, err := r.client.RangeQuery(ctx, logQL, from, to, query.Limit)
if err != nil {
return make([]*annotations.ItemDTO, 0), ErrLokiStoreInternal.Errorf("failed to query loki: %w", err)
}
items := make([]*annotations.ItemDTO, 0)
for _, stream := range res.Data.Result {
items = append(items, r.annotationsFromStream(stream, *accessResources)...)
}
sort.Sort(annotations.SortedItems(items))
return items, err
}
func (r *LokiHistorianStore) annotationsFromStream(stream historian.Stream, ac accesscontrol.AccessResources) []*annotations.ItemDTO {
items := make([]*annotations.ItemDTO, 0, len(stream.Values))
for _, sample := range stream.Values {
entry := historian.LokiEntry{}
err := json.Unmarshal([]byte(sample.V), &entry)
if err != nil {
// bad data, skip
continue
}
if !hasAccess(entry, ac) {
// no access to this annotation, skip
continue
}
transition, err := buildTransition(entry)
if err != nil {
// bad data, skip
continue
}
if !historian.ShouldRecordAnnotation(*transition) {
// skip non-annotation transition
continue
}
annotationText, annotationData := historian.BuildAnnotationTextAndData(
historymodel.RuleMeta{
Title: entry.RuleTitle,
},
transition.State,
)
items = append(items, &annotations.ItemDTO{
AlertID: entry.RuleID,
DashboardID: ac.Dashboards[entry.DashboardUID],
DashboardUID: &entry.DashboardUID,
PanelID: entry.PanelID,
NewState: entry.Current,
PrevState: entry.Previous,
Time: sample.T.UnixMilli(),
Text: annotationText,
Data: annotationData,
})
}
return items
}
func (r *LokiHistorianStore) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
return annotations.FindTagsResult{}, nil
}
// util
func getRule(ctx context.Context, sql db.DB, orgID int64, ruleID int64) (*ngmodels.AlertRule, error) {
rule := &ngmodels.AlertRule{OrgID: orgID, ID: ruleID}
err := sql.WithDbSession(ctx, func(sess *db.Session) error {
exists, err := sess.Get(rule)
if err != nil {
return err
}
if !exists {
return errMissingRule
}
return nil
})
return rule, err
}
func hasAccess(entry historian.LokiEntry, resources accesscontrol.AccessResources) bool {
orgFilter := resources.CanAccessOrgAnnotations && entry.DashboardUID == ""
dashFilter := func() bool {
if !resources.CanAccessDashAnnotations {
return false
}
_, canAccess := resources.Dashboards[entry.DashboardUID]
return canAccess
}
return orgFilter || dashFilter()
}
type number interface {
constraints.Integer | constraints.Float
}
// numericMap converts a simplejson map[string]any to a map[string]N, where N is numeric (int or float).
func numericMap[N number](j *simplejson.Json) (map[string]N, error) {
m, err := j.Map()
if err != nil {
return nil, err
}
values := make(map[string]N)
for k, v := range m {
a, ok := (v).(json.Number)
if !ok {
return nil, fmt.Errorf("unexpected value type %T", v)
}
f, err := a.Float64()
if err != nil {
return nil, err
}
values[k] = N(f)
}
return values, nil
}
func buildTransition(entry historian.LokiEntry) (*state.StateTransition, error) {
curState, curStateReason, err := state.ParseFormattedState(entry.Current)
if err != nil {
return nil, fmt.Errorf("parsing current state: %w", err)
}
prevState, prevReason, err := state.ParseFormattedState(entry.Previous)
if err != nil {
return nil, fmt.Errorf("parsing previous state: %w", err)
}
v, err := numericMap[float64](entry.Values)
if err != nil {
return nil, fmt.Errorf("parsing entry values: %w", err)
}
return &state.StateTransition{
State: &state.State{
State: curState,
StateReason: curStateReason,
Values: v,
Labels: entry.InstanceLabels,
},
PreviousState: prevState,
PreviousStateReason: prevReason,
}, nil
}
func buildHistoryQuery(query *annotations.ItemQuery, dashboards map[string]int64, ruleUID string) ngmodels.HistoryQuery {
historyQuery := ngmodels.HistoryQuery{
OrgID: query.OrgID,
DashboardUID: query.DashboardUID,
PanelID: query.PanelID,
RuleUID: ruleUID,
}
if historyQuery.DashboardUID == "" && query.DashboardID != 0 {
for uid, id := range dashboards {
if query.DashboardID == id {
historyQuery.DashboardUID = uid
break
}
}
}
return historyQuery
}
func useStore(cfg setting.UnifiedAlertingStateHistorySettings, ft featuremgmt.FeatureToggles) bool {
if !cfg.Enabled {
return false
@ -62,7 +281,7 @@ func useStore(cfg setting.UnifiedAlertingStateHistorySettings, ft featuremgmt.Fe
// Override config based on feature toggles.
// We pass in a no-op logger here since this function is also called during ngalert init,
// and we don't want to log the same problem twice.
// and we don't want to log the same info twice.
ngalert.ApplyStateHistoryFeatureToggles(&cfg, ft, log.NewNopLogger())
backend, err := historian.ParseBackendType(cfg.Backend)
@ -70,6 +289,6 @@ func useStore(cfg setting.UnifiedAlertingStateHistorySettings, ft featuremgmt.Fe
return false
}
// We should only query Loki if annotations do no exist in the database.
// We should only query Loki if annotations do not exist in the database.
return backend == historian.BackendTypeLoki
}

View File

@ -1,13 +1,707 @@
package loki
import (
"context"
"encoding/json"
"errors"
"math/rand"
"net/url"
"strconv"
"sync"
"testing"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/annotations"
annotation_ac "github.com/grafana/grafana/pkg/services/annotations/accesscontrol"
"github.com/grafana/grafana/pkg/services/annotations/testutil"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/ngalert/client"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/state/historian"
historymodel "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
"github.com/grafana/grafana/pkg/setting"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
func TestIntegrationAlertStateHistoryStore(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")
}
sql := db.InitTestDB(t)
dashboard1 := testutil.CreateDashboard(t, sql, featuremgmt.WithFeatures(), dashboards.SaveDashboardCommand{
UserID: 1,
OrgID: 1,
Dashboard: simplejson.NewFromAny(map[string]any{
"title": "Dashboard 1",
}),
})
dashboard2 := testutil.CreateDashboard(t, sql, featuremgmt.WithFeatures(), dashboards.SaveDashboardCommand{
UserID: 1,
OrgID: 1,
Dashboard: simplejson.NewFromAny(map[string]any{
"title": "Dashboard 2",
}),
})
knownUIDs := &sync.Map{}
dashboardRules := map[string][]*ngmodels.AlertRule{
dashboard1.UID: {
createAlertRuleWithDashboard(t, sql, knownUIDs, "Test Rule 1", dashboard1.UID),
createAlertRuleWithDashboard(t, sql, knownUIDs, "Test Rule 2", dashboard1.UID),
},
dashboard2.UID: {
createAlertRuleWithDashboard(t, sql, knownUIDs, "Test Rule 3", dashboard2.UID),
},
}
t.Run("Testing Loki state history read", func(t *testing.T) {
start := time.Now()
numTransitions := 2
transitions := genStateTransitions(t, numTransitions, start)
fakeLokiClient := NewFakeLokiClient()
store := createTestLokiStore(t, sql, fakeLokiClient)
t.Run("can query history by alert id", func(t *testing.T) {
rule := dashboardRules[dashboard1.UID][0]
fakeLokiClient.Response = []historian.Stream{
historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger()),
}
query := annotations.ItemQuery{
OrgID: 1,
AlertID: rule.ID,
From: start.UnixMilli(),
To: start.Add(time.Second * time.Duration(numTransitions+1)).UnixMilli(),
}
res, err := store.Get(
context.Background(),
&query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
},
CanAccessDashAnnotations: true,
},
)
require.NoError(t, err)
require.Len(t, res, numTransitions)
})
t.Run("can query history by dashboard id", func(t *testing.T) {
fakeLokiClient.Response = []historian.Stream{
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
}
query := annotations.ItemQuery{
OrgID: 1,
DashboardID: dashboard1.ID,
From: start.UnixMilli(),
To: start.Add(time.Second * time.Duration(numTransitions+1)).UnixMilli(),
}
res, err := store.Get(
context.Background(),
&query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
},
CanAccessDashAnnotations: true,
},
)
require.NoError(t, err)
require.Len(t, res, 2*numTransitions)
})
t.Run("should not find any when history is outside time range", func(t *testing.T) {
query := annotations.ItemQuery{
OrgID: 1,
DashboardID: dashboard1.ID,
From: start.Add(-2 * time.Second).UnixMilli(),
To: start.Add(-1 * time.Second).UnixMilli(),
}
res, err := store.Get(
context.Background(),
&query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
},
CanAccessDashAnnotations: true,
},
)
require.NoError(t, err)
require.Len(t, res, 0)
})
t.Run("should sort history by time", func(t *testing.T) {
fakeLokiClient.Response = []historian.Stream{
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][0]), transitions, map[string]string{}, log.NewNopLogger()),
historian.StatesToStream(ruleMetaFromRule(t, dashboardRules[dashboard1.UID][1]), transitions, map[string]string{}, log.NewNopLogger()),
}
query := annotations.ItemQuery{
OrgID: 1,
DashboardID: dashboard1.ID,
From: start.UnixMilli(),
To: start.Add(time.Second * time.Duration(numTransitions+1)).UnixMilli(),
}
res, err := store.Get(
context.Background(),
&query,
&annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
},
CanAccessDashAnnotations: true,
},
)
require.NoError(t, err)
require.Len(t, res, 2*numTransitions)
var lastTime int64
for _, item := range res {
if lastTime != 0 {
require.True(t, item.Time <= lastTime)
}
lastTime = item.Time
}
})
})
t.Run("Testing items from Loki stream", func(t *testing.T) {
fakeLokiClient := NewFakeLokiClient()
store := createTestLokiStore(t, sql, fakeLokiClient)
t.Run("should return empty list when no streams", func(t *testing.T) {
items := store.annotationsFromStream(historian.Stream{}, annotation_ac.AccessResources{})
require.Empty(t, items)
})
t.Run("should return empty list when no entries", func(t *testing.T) {
items := store.annotationsFromStream(historian.Stream{
Values: []historian.Sample{},
}, annotation_ac.AccessResources{})
require.Empty(t, items)
})
t.Run("should return one annotation per sample", func(t *testing.T) {
rule := dashboardRules[dashboard1.UID][0]
start := time.Now()
numTransitions := 2
transitions := genStateTransitions(t, numTransitions, start)
stream := historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger())
items := store.annotationsFromStream(stream, annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
},
CanAccessDashAnnotations: true,
})
require.Len(t, items, numTransitions)
for i := 0; i < numTransitions; i++ {
item := items[i]
transition := transitions[i]
expected := &annotations.ItemDTO{
AlertID: rule.ID,
DashboardID: dashboard1.ID,
DashboardUID: &dashboard1.UID,
PanelID: *rule.PanelID,
Time: transition.State.LastEvaluationTime.UnixMilli(),
NewState: transition.Formatted(),
}
if i > 0 {
prevTransition := transitions[i-1]
expected.PrevState = prevTransition.Formatted()
}
compareAnnotationItem(t, expected, item)
}
})
t.Run("should filter out annotations from dashboards not in scope", func(t *testing.T) {
start := time.Now()
numTransitions := 2
transitions := genStateTransitions(t, numTransitions, start)
rule := dashboardRules[dashboard1.UID][0]
stream1 := historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger())
rule = createAlertRule(t, sql, knownUIDs, "Test rule")
stream2 := historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger())
stream := historian.Stream{
Values: append(stream1.Values, stream2.Values...),
Stream: stream1.Stream,
}
items := store.annotationsFromStream(stream, annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
},
CanAccessDashAnnotations: true,
})
require.Len(t, items, numTransitions)
for _, item := range items {
require.Equal(t, dashboard1.ID, item.DashboardID)
require.Equal(t, dashboard1.UID, *item.DashboardUID)
}
})
t.Run("should include only annotations without linked dashboard on org scope", func(t *testing.T) {
start := time.Now()
numTransitions := 2
transitions := genStateTransitions(t, numTransitions, start)
rule := dashboardRules[dashboard1.UID][0]
stream1 := historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger())
rule.DashboardUID = nil
stream2 := historian.StatesToStream(ruleMetaFromRule(t, rule), transitions, map[string]string{}, log.NewNopLogger())
stream := historian.Stream{
Values: append(stream1.Values, stream2.Values...),
Stream: stream1.Stream,
}
items := store.annotationsFromStream(stream, annotation_ac.AccessResources{
Dashboards: map[string]int64{
dashboard1.UID: dashboard1.ID,
},
CanAccessOrgAnnotations: true,
})
require.Len(t, items, numTransitions)
for _, item := range items {
require.Zero(t, *item.DashboardUID)
require.Zero(t, item.DashboardID)
}
})
})
}
func TestHasAccess(t *testing.T) {
entry := historian.LokiEntry{
DashboardUID: "dashboard-uid",
}
t.Run("should return false when scope is organization and entry has dashboard UID", func(t *testing.T) {
require.False(t, hasAccess(entry, annotation_ac.AccessResources{
CanAccessOrgAnnotations: true,
}))
})
t.Run("should return false when scope is dashboard and dashboard UID is not in resources", func(t *testing.T) {
require.False(t, hasAccess(entry, annotation_ac.AccessResources{
CanAccessDashAnnotations: true,
Dashboards: map[string]int64{
"other-dashboard-uid": 1,
},
}))
})
t.Run("should return true when scope is organization and entry has no dashboard UID", func(t *testing.T) {
require.True(t, hasAccess(historian.LokiEntry{}, annotation_ac.AccessResources{
CanAccessOrgAnnotations: true,
}))
})
t.Run("should return true when scope is dashboard and dashboard UID is in resources", func(t *testing.T) {
require.True(t, hasAccess(entry, annotation_ac.AccessResources{
CanAccessDashAnnotations: true,
Dashboards: map[string]int64{
"dashboard-uid": 1,
},
}))
})
}
func TestFloat64Map(t *testing.T) {
t.Run(`should convert json string:float kv to Golang map[string]float64`, func(t *testing.T) {
jsonMap := simplejson.NewFromAny(map[string]any{
"key1": json.Number("1.0"),
"key2": json.Number("2.0"),
})
golangMap, err := numericMap[float64](jsonMap)
require.NoError(t, err)
require.Equal(t, map[string]float64{
"key1": 1.0,
"key2": 2.0,
}, golangMap)
})
t.Run("should return error when json map contains non-float values", func(t *testing.T) {
jsonMap := simplejson.NewFromAny(map[string]any{
"key1": json.Number("1.0"),
"key2": "not a float",
})
_, err := numericMap[float64](jsonMap)
require.Error(t, err)
})
}
func TestBuildHistoryQuery(t *testing.T) {
t.Run("should set dashboard UID from dashboard ID if query does not contain UID", func(t *testing.T) {
query := buildHistoryQuery(
&annotations.ItemQuery{
DashboardID: 1,
},
map[string]int64{
"dashboard-uid": 1,
},
"rule-uid",
)
require.Equal(t, "dashboard-uid", query.DashboardUID)
})
t.Run("should skip dashboard UID if missing from query and dashboard map", func(t *testing.T) {
query := buildHistoryQuery(
&annotations.ItemQuery{
DashboardID: 1,
},
map[string]int64{
"other-dashboard-uid": 2,
},
"rule-uid",
)
require.Zero(t, query.DashboardUID)
})
t.Run("should skip dashboard UID when not in query", func(t *testing.T) {
query := buildHistoryQuery(
&annotations.ItemQuery{},
map[string]int64{
"dashboard-uid": 1,
},
"rule-uid",
)
require.Zero(t, query.DashboardUID)
})
}
func TestBuildTransition(t *testing.T) {
t.Run("should return error when entry contains invalid state strings", func(t *testing.T) {
_, err := buildTransition(historian.LokiEntry{
Current: "Invalid",
})
require.Error(t, err)
_, err = buildTransition(historian.LokiEntry{
Current: "Normal",
Previous: "Invalid",
})
require.Error(t, err)
})
t.Run("should return error when values are not numbers", func(t *testing.T) {
_, err := buildTransition(historian.LokiEntry{
Current: "Normal",
Values: simplejson.NewFromAny(map[string]any{"key1": "not a float"}),
})
require.Error(t, err)
})
t.Run("should build transition correctly", func(t *testing.T) {
values := map[string]float64{
"key1": 1.0,
"key2": 2.0,
}
labels := map[string]string{
"key1": "value1",
"key2": "value2",
}
jsonValues := simplejson.New()
for k, v := range values {
jsonValues.Set(k, json.Number(strconv.FormatFloat(v, 'f', -1, 64)))
}
entry := historian.LokiEntry{
Current: "Normal",
Previous: "Error (NoData)",
Values: jsonValues,
InstanceLabels: labels,
}
expected := &state.StateTransition{
State: &state.State{
State: eval.Normal,
StateReason: "",
LastEvaluationTime: time.Time{},
Values: values,
Labels: labels,
},
PreviousState: eval.Error,
PreviousStateReason: eval.NoData.String(),
}
stub, err := buildTransition(entry)
require.NoError(t, err)
require.Equal(t, expected, stub)
})
}
func createTestLokiStore(t *testing.T, sql db.DB, client lokiQueryClient) *LokiHistorianStore {
t.Helper()
return &LokiHistorianStore{
client: client,
db: sql,
log: log.NewNopLogger(),
}
}
func createAlertRule(t *testing.T, sql db.DB, knownUIDs *sync.Map, title string) *ngmodels.AlertRule {
t.Helper()
if knownUIDs == nil {
knownUIDs = &sync.Map{}
}
generator := ngmodels.AlertRuleGen(
ngmodels.WithTitle(title),
ngmodels.WithUniqueUID(knownUIDs),
withDashboardUID(""), // no dashboard
ngmodels.WithUniqueID(),
ngmodels.WithOrgID(1),
)
rule := generator()
err := sql.WithDbSession(context.Background(), func(sess *db.Session) error {
_, err := sess.Table(ngmodels.AlertRule{}).InsertOne(rule)
if err != nil {
return err
}
dbRule := &ngmodels.AlertRule{}
exist, err := sess.Table(ngmodels.AlertRule{}).ID(rule.ID).Get(dbRule)
if err != nil {
return err
}
if !exist {
return errors.New("cannot read inserted record")
}
rule = dbRule
return nil
})
require.NoError(t, err)
return rule
}
func createAlertRuleWithDashboard(t *testing.T, sql db.DB, knownUIDs *sync.Map, title string, dashboardUID string) *ngmodels.AlertRule {
t.Helper()
if knownUIDs == nil {
knownUIDs = &sync.Map{}
}
generator := ngmodels.AlertRuleGen(
ngmodels.WithTitle(title),
ngmodels.WithUniqueUID(knownUIDs),
ngmodels.WithUniqueID(),
ngmodels.WithOrgID(1),
withDashboardUID(dashboardUID),
withPanelID(123),
)
rule := generator()
err := sql.WithDbSession(context.Background(), func(sess *db.Session) error {
_, err := sess.Table(ngmodels.AlertRule{}).InsertOne(rule)
if err != nil {
return err
}
dbRule := &ngmodels.AlertRule{}
exist, err := sess.Table(ngmodels.AlertRule{}).ID(rule.ID).Get(dbRule)
if err != nil {
return err
}
if !exist {
return errors.New("cannot read inserted record")
}
rule = dbRule
return nil
})
require.NoError(t, err)
return rule
}
func ruleMetaFromRule(t *testing.T, rule *ngmodels.AlertRule) historymodel.RuleMeta {
t.Helper()
meta := historymodel.RuleMeta{
OrgID: rule.OrgID,
UID: rule.UID,
ID: rule.ID,
}
if rule.DashboardUID != nil {
meta.DashboardUID = *rule.DashboardUID
}
if rule.PanelID != nil {
meta.PanelID = *rule.PanelID
}
return meta
}
func genStateTransitions(t *testing.T, num int, start time.Time) []state.StateTransition {
t.Helper()
transitions := make([]state.StateTransition, 0, num)
lastState := state.State{
State: eval.Normal,
StateReason: "",
LastEvaluationTime: start,
Values: map[string]float64{
"key1": 1.0,
"key2": 2.0,
},
Labels: map[string]string{
"key1": "value1",
"key2": "value2",
},
}
for i := 0; i < num; i++ {
stateVal := rand.Intn(4)
if stateVal == int(lastState.State) {
stateVal = (stateVal + 1) % 4
}
newState := state.State{
State: eval.State(stateVal),
StateReason: "",
LastEvaluationTime: lastState.LastEvaluationTime.Add(time.Second * time.Duration(i)),
Values: lastState.Values,
Labels: lastState.Labels,
}
transitions = append(transitions, state.StateTransition{
PreviousState: lastState.State,
PreviousStateReason: lastState.StateReason,
State: &newState,
})
lastState = newState
}
return transitions
}
func withDashboardUID(dashboardUID string) ngmodels.AlertRuleMutator {
return func(rule *ngmodels.AlertRule) {
rule.DashboardUID = &dashboardUID
}
}
func withPanelID(panelID int64) ngmodels.AlertRuleMutator {
return func(rule *ngmodels.AlertRule) {
rule.PanelID = &panelID
}
}
func compareAnnotationItem(t *testing.T, expected, actual *annotations.ItemDTO) {
require.Equal(t, expected.AlertID, actual.AlertID)
require.Equal(t, expected.AlertName, actual.AlertName)
if expected.PanelID != 0 {
require.Equal(t, expected.PanelID, actual.PanelID)
}
if expected.DashboardUID != nil {
require.Equal(t, expected.DashboardID, actual.DashboardID)
require.Equal(t, *expected.DashboardUID, *actual.DashboardUID)
}
require.Equal(t, expected.NewState, actual.NewState)
if expected.PrevState != "" {
require.Equal(t, expected.PrevState, actual.PrevState)
}
require.Equal(t, expected.Time, actual.Time)
if expected.Text != "" && expected.Data != nil {
require.Equal(t, expected.Text, actual.Text)
require.Equal(t, expected.Data, actual.Data)
}
}
type FakeLokiClient struct {
client client.Requester
cfg historian.LokiConfig
metrics *metrics.Historian
log log.Logger
Response []historian.Stream
}
func NewFakeLokiClient() *FakeLokiClient {
url, _ := url.Parse("http://some.url")
req := historian.NewFakeRequester()
metrics := metrics.NewHistorianMetrics(prometheus.NewRegistry(), "annotations_test")
return &FakeLokiClient{
client: client.NewTimedClient(req, metrics.WriteDuration),
cfg: historian.LokiConfig{
WritePathURL: url,
ReadPathURL: url,
Encoder: historian.JsonEncoder{},
},
metrics: metrics,
log: log.New("ngalert.state.historian", "backend", "loki"),
}
}
func (c *FakeLokiClient) RangeQuery(_ context.Context, _ string, from, to, _ int64) (historian.QueryRes, error) {
streams := make([]historian.Stream, len(c.Response))
for n, stream := range c.Response {
streams[n].Stream = stream.Stream
streams[n].Values = []historian.Sample{}
for _, sample := range stream.Values {
if sample.T.UnixNano() < from || sample.T.UnixNano() >= to { // matches Loki behavior
continue
}
streams[n].Values = append(streams[n].Values, sample)
}
}
res := historian.QueryRes{
Data: historian.QueryData{
Result: streams,
},
}
// reset expected streams on read
c.Response = []historian.Stream{}
return res, nil
}
func TestUseStore(t *testing.T) {
t.Run("false if state history disabled", func(t *testing.T) {
cfg := setting.UnifiedAlertingStateHistorySettings{

View File

@ -275,6 +275,23 @@ func (s State) String() string {
return [...]string{"Normal", "Alerting", "Pending", "NoData", "Error"}[s]
}
func ParseStateString(repr string) (State, error) {
switch strings.ToLower(repr) {
case "normal":
return Normal, nil
case "alerting":
return Alerting, nil
case "pending":
return Pending, nil
case "nodata":
return NoData, nil
case "error":
return Error, nil
default:
return -1, fmt.Errorf("invalid state: %s", repr)
}
}
func buildDatasourceHeaders(ctx context.Context) map[string]string {
headers := map[string]string{
// Many data sources check this in query method as sometimes alerting needs special considerations.

View File

@ -173,12 +173,12 @@ func (h *AnnotationBackend) Query(ctx context.Context, query ngmodels.HistoryQue
func buildAnnotations(rule history_model.RuleMeta, states []state.StateTransition, logger log.Logger) []annotations.Item {
items := make([]annotations.Item, 0, len(states))
for _, state := range states {
if !shouldRecordAnnotation(state) {
if !ShouldRecordAnnotation(state) {
continue
}
logger.Debug("Alert state changed creating annotation", "newState", state.Formatted(), "oldState", state.PreviousFormatted())
annotationText, annotationData := buildAnnotationTextAndData(rule, state.State)
annotationText, annotationData := BuildAnnotationTextAndData(rule, state.State)
item := annotations.Item{
AlertID: rule.ID,
@ -195,7 +195,7 @@ func buildAnnotations(rule history_model.RuleMeta, states []state.StateTransitio
return items
}
func buildAnnotationTextAndData(rule history_model.RuleMeta, currentState *state.State) (string, *simplejson.Json) {
func BuildAnnotationTextAndData(rule history_model.RuleMeta, currentState *state.State) (string, *simplejson.Json) {
jsonData := simplejson.New()
var value string

View File

@ -34,9 +34,9 @@ func shouldRecord(transition state.StateTransition) bool {
return true
}
// shouldRecordAnnotation returns true if an annotation should be created for a given state transition.
// ShouldRecordAnnotation returns true if an annotation should be created for a given state transition.
// This is stricter than shouldRecord to avoid cluttering panels with state transitions.
func shouldRecordAnnotation(t state.StateTransition) bool {
func ShouldRecordAnnotation(t state.StateTransition) bool {
if !shouldRecord(t) {
return false
}

View File

@ -110,8 +110,8 @@ func TestShouldRecordAnnotation(t *testing.T) {
forward := transition(eval.Normal, "", eval.Normal, models.StateReasonNoData)
backward := transition(eval.Normal, models.StateReasonNoData, eval.Normal, "")
require.False(t, shouldRecordAnnotation(forward), "Normal -> Normal(NoData) should be false")
require.False(t, shouldRecordAnnotation(backward), "Normal(NoData) -> Normal should be false")
require.False(t, ShouldRecordAnnotation(forward), "Normal -> Normal(NoData) should be false")
require.False(t, ShouldRecordAnnotation(backward), "Normal(NoData) -> Normal should be false")
})
t.Run("other Normal transitions involving NoData still recorded", func(t *testing.T) {
@ -121,11 +121,11 @@ func TestShouldRecordAnnotation(t *testing.T) {
errorBackward := transition(eval.Normal, models.StateReasonError, eval.Normal, models.StateReasonNoData)
missingSeriesBackward := transition(eval.Normal, models.StateReasonMissingSeries, eval.Normal, models.StateReasonNoData)
require.True(t, shouldRecordAnnotation(pauseForward), "Normal(NoData) -> Normal(Paused) should be true")
require.True(t, shouldRecordAnnotation(pauseBackward), "Normal(Paused) -> Normal(NoData) should be true")
require.True(t, shouldRecordAnnotation(errorForward), "Normal(NoData) -> Normal(Error) should be true")
require.True(t, shouldRecordAnnotation(errorBackward), "Normal(Error) -> Normal(NoData) should be true")
require.True(t, shouldRecordAnnotation(missingSeriesBackward), "Normal(MissingSeries) -> Normal(NoData) should be true")
require.True(t, ShouldRecordAnnotation(pauseForward), "Normal(NoData) -> Normal(Paused) should be true")
require.True(t, ShouldRecordAnnotation(pauseBackward), "Normal(Paused) -> Normal(NoData) should be true")
require.True(t, ShouldRecordAnnotation(errorForward), "Normal(NoData) -> Normal(Error) should be true")
require.True(t, ShouldRecordAnnotation(errorBackward), "Normal(Error) -> Normal(NoData) should be true")
require.True(t, ShouldRecordAnnotation(missingSeriesBackward), "Normal(MissingSeries) -> Normal(NoData) should be true")
})
t.Run("respects filters in shouldRecord()", func(t *testing.T) {
@ -133,19 +133,19 @@ func TestShouldRecordAnnotation(t *testing.T) {
unpause := transition(eval.Normal, models.StateReasonPaused, eval.Normal, "")
afterUpdate := transition(eval.Normal, models.StateReasonUpdated, eval.Normal, "")
require.False(t, shouldRecordAnnotation(missingSeries), "Normal -> Normal(MissingSeries) should be false")
require.False(t, shouldRecordAnnotation(unpause), "Normal(Paused) -> Normal should be false")
require.False(t, shouldRecordAnnotation(afterUpdate), "Normal(Updated) -> Normal should be false")
require.False(t, ShouldRecordAnnotation(missingSeries), "Normal -> Normal(MissingSeries) should be false")
require.False(t, ShouldRecordAnnotation(unpause), "Normal(Paused) -> Normal should be false")
require.False(t, ShouldRecordAnnotation(afterUpdate), "Normal(Updated) -> Normal should be false")
// Smoke test a few basic ones, exhaustive tests for shouldRecord() already exist elsewhere.
basicPending := transition(eval.Normal, "", eval.Pending, "")
basicAlerting := transition(eval.Pending, "", eval.Alerting, "")
basicResolve := transition(eval.Alerting, "", eval.Normal, "")
basicError := transition(eval.Normal, "", eval.Error, "")
require.True(t, shouldRecordAnnotation(basicPending), "Normal -> Pending should be true")
require.True(t, shouldRecordAnnotation(basicAlerting), "Pending -> Alerting should be true")
require.True(t, shouldRecordAnnotation(basicResolve), "Alerting -> Normal should be true")
require.True(t, shouldRecordAnnotation(basicError), "Normal -> Error should be true")
require.True(t, ShouldRecordAnnotation(basicPending), "Normal -> Pending should be true")
require.True(t, ShouldRecordAnnotation(basicAlerting), "Pending -> Alerting should be true")
require.True(t, ShouldRecordAnnotation(basicResolve), "Alerting -> Normal should be true")
require.True(t, ShouldRecordAnnotation(basicError), "Normal -> Error should be true")
})
}

View File

@ -73,7 +73,7 @@ func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error {
// Record writes a number of state transitions for a given rule to an external Loki instance.
func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
logger := h.log.FromContext(ctx)
logStream := statesToStream(rule, states, h.externalLabels, logger)
logStream := StatesToStream(rule, states, h.externalLabels, logger)
errCh := make(chan error, 1)
if len(logStream.Values) == 0 {
@ -112,7 +112,7 @@ func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleM
// Query retrieves state history entries from an external Loki instance and formats the results into a dataframe.
func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
logQL, err := buildLogQuery(query)
logQL, err := BuildLogQuery(query)
if err != nil {
return nil, err
}
@ -200,7 +200,7 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
if minElStreamIdx == -1 {
break
}
var entry lokiEntry
var entry LokiEntry
err := json.Unmarshal([]byte(minEl.V), &entry)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal entry: %w", err)
@ -231,7 +231,7 @@ func merge(res QueryRes, ruleUID string) (*data.Frame, error) {
return frame, nil
}
func statesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) Stream {
func StatesToStream(rule history_model.RuleMeta, states []state.StateTransition, externalLabels map[string]string, logger log.Logger) Stream {
labels := mergeLabels(make(map[string]string), externalLabels)
// System-defined labels take precedence over user-defined external labels.
labels[StateHistoryLabelKey] = StateHistoryLabelValue
@ -246,7 +246,7 @@ func statesToStream(rule history_model.RuleMeta, states []state.StateTransition,
}
sanitizedLabels := removePrivateLabels(state.Labels)
entry := lokiEntry{
entry := LokiEntry{
SchemaVersion: 1,
Previous: state.PreviousFormatted(),
Current: state.Formatted(),
@ -292,7 +292,7 @@ func (h *RemoteLokiBackend) recordStreams(ctx context.Context, streams []Stream,
return nil
}
type lokiEntry struct {
type LokiEntry struct {
SchemaVersion int `json:"schemaVersion"`
Previous string `json:"previous"`
Current string `json:"current"`
@ -322,7 +322,7 @@ func jsonifyRow(line string) (json.RawMessage, error) {
// Ser/deser to validate the contents of the log line before shipping it forward.
// TODO: We may want to remove this in the future, as we already have the value in the form of a []byte, and json.RawMessage is also a []byte.
// TODO: Though, if the log line does not contain valid JSON, this can cause problems later on when rendering the dataframe.
var entry lokiEntry
var entry LokiEntry
if err := json.Unmarshal([]byte(line), &entry); err != nil {
return nil, err
}
@ -366,7 +366,7 @@ func isValidOperator(op string) bool {
return false
}
func buildLogQuery(query models.HistoryQuery) (string, error) {
func BuildLogQuery(query models.HistoryQuery) (string, error) {
selectors, err := buildSelectors(query)
if err != nil {
return "", fmt.Errorf("failed to build the provided selectors: %w", err)

View File

@ -31,7 +31,7 @@ func TestRemoteLokiBackend(t *testing.T) {
l := log.NewNopLogger()
states := singleFromNormal(&state.State{State: eval.Normal})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
require.Empty(t, res.Values)
})
@ -41,7 +41,7 @@ func TestRemoteLokiBackend(t *testing.T) {
l := log.NewNopLogger()
states := singleFromNormal(&state.State{State: eval.Error, Error: fmt.Errorf("oh no")})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.Contains(t, entry.Error, "oh no")
@ -52,7 +52,7 @@ func TestRemoteLokiBackend(t *testing.T) {
l := log.NewNopLogger()
states := singleFromNormal(&state.State{State: eval.NoData})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
_ = requireSingleEntry(t, res)
})
@ -65,7 +65,7 @@ func TestRemoteLokiBackend(t *testing.T) {
Labels: data.Labels{"a": "b"},
})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
exp := map[string]string{
StateHistoryLabelKey: StateHistoryLabelValue,
@ -84,7 +84,7 @@ func TestRemoteLokiBackend(t *testing.T) {
Labels: data.Labels{"__private__": "b"},
})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
require.NotContains(t, res.Stream, "__private__")
})
@ -97,7 +97,8 @@ func TestRemoteLokiBackend(t *testing.T) {
Labels: data.Labels{"a": "b"},
})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.Equal(t, rule.Title, entry.RuleTitle)
@ -113,7 +114,7 @@ func TestRemoteLokiBackend(t *testing.T) {
Labels: data.Labels{"statelabel": "labelvalue"},
})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.Contains(t, entry.InstanceLabels, "statelabel")
@ -131,7 +132,7 @@ func TestRemoteLokiBackend(t *testing.T) {
},
})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.Len(t, entry.InstanceLabels, 3)
@ -145,7 +146,7 @@ func TestRemoteLokiBackend(t *testing.T) {
Values: map[string]float64{"A": 2.0, "B": 5.5},
})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.NotNil(t, entry.Values)
@ -164,7 +165,7 @@ func TestRemoteLokiBackend(t *testing.T) {
Labels: data.Labels{"a": "b"},
})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
require.Equal(t, rule.Condition, entry.Condition)
@ -182,7 +183,7 @@ func TestRemoteLokiBackend(t *testing.T) {
},
})
res := statesToStream(rule, states, nil, l)
res := StatesToStream(rule, states, nil, l)
entry := requireSingleEntry(t, res)
exp := labelFingerprint(states[0].Labels)
@ -281,7 +282,7 @@ func TestRemoteLokiBackend(t *testing.T) {
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
res, err := buildLogQuery(tc.query)
res, err := BuildLogQuery(tc.query)
require.NoError(t, err)
require.Equal(t, tc.exp, res)
})
@ -537,15 +538,15 @@ func createTestRule() history_model.RuleMeta {
}
}
func requireSingleEntry(t *testing.T, res Stream) lokiEntry {
func requireSingleEntry(t *testing.T, res Stream) LokiEntry {
require.Len(t, res.Values, 1)
return requireEntry(t, res.Values[0])
}
func requireEntry(t *testing.T, row Sample) lokiEntry {
func requireEntry(t *testing.T, row Sample) LokiEntry {
t.Helper()
var entry lokiEntry
var entry LokiEntry
err := json.Unmarshal([]byte(row.V), &entry)
require.NoError(t, err)
return entry

View File

@ -482,6 +482,27 @@ func FormatStateAndReason(state eval.State, reason string) string {
return s
}
// ParseFormattedState parses a state string in the format "state (reason)"
// and returns the state and reason separately.
func ParseFormattedState(stateStr string) (eval.State, string, error) {
split := strings.Split(stateStr, " ")
if len(split) == 0 {
return -1, "", errors.New("invalid state format")
}
state, err := eval.ParseStateString(split[0])
if err != nil {
return -1, "", err
}
var reason string
if len(split) > 1 {
reason = strings.Trim(split[1], "()")
}
return state, reason, nil
}
// GetRuleExtraLabels returns a map of built-in labels that should be added to an alert before it is sent to the Alertmanager or its state is cached.
func GetRuleExtraLabels(rule *models.AlertRule, folderTitle string, includeFolder bool) map[string]string {
extraLabels := make(map[string]string, 4)

View File

@ -666,3 +666,26 @@ func TestTakeImage(t *testing.T) {
assert.Equal(t, ngmodels.Image{Path: "foo.png"}, *image)
})
}
func TestParseFormattedState(t *testing.T) {
t.Run("should parse formatted state", func(t *testing.T) {
stateStr := "Normal (MissingSeries)"
s, reason, err := ParseFormattedState(stateStr)
require.NoError(t, err)
require.Equal(t, eval.Normal, s)
require.Equal(t, ngmodels.StateReasonMissingSeries, reason)
})
t.Run("should error on empty string", func(t *testing.T) {
stateStr := ""
_, _, err := ParseFormattedState(stateStr)
require.Error(t, err)
})
t.Run("should error on invalid string content", func(t *testing.T) {
stateStr := "NotAState"
_, _, err := ParseFormattedState(stateStr)
require.Error(t, err)
})
}