Alerting: Create query interface for state history along with annotation-based implementation (#61646)

This commit is contained in:
Alexander Weaver 2023-01-19 03:45:31 -06:00 committed by GitHub
parent cacc55ba06
commit c10713ea76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 204 additions and 6 deletions

View File

@ -0,0 +1,12 @@
package models
import "time"
// HistoryQuery represents a query for alert state history.
type HistoryQuery struct {
RuleUID string
OrgID int64
Labels map[string]string
From time.Time
To time.Time
}

View File

@ -205,7 +205,7 @@ func (ng *AlertNG) init() error {
Tracer: ng.tracer,
}
history, err := configureHistorianBackend(ng.Cfg.UnifiedAlerting.StateHistory, ng.annotationsRepo, ng.dashboardService)
history, err := configureHistorianBackend(ng.Cfg.UnifiedAlerting.StateHistory, ng.annotationsRepo, ng.dashboardService, ng.store)
if err != nil {
return err
}
@ -378,13 +378,13 @@ func readQuotaConfig(cfg *setting.Cfg) (*quota.Map, error) {
return limits, nil
}
func configureHistorianBackend(cfg setting.UnifiedAlertingStateHistorySettings, ar annotations.Repository, ds dashboards.DashboardService) (state.Historian, error) {
func configureHistorianBackend(cfg setting.UnifiedAlertingStateHistorySettings, ar annotations.Repository, ds dashboards.DashboardService, rs historian.RuleStore) (state.Historian, error) {
if !cfg.Enabled {
return historian.NewNopHistorian(), nil
}
if cfg.Backend == "annotations" {
return historian.NewAnnotationBackend(ar, ds), nil
return historian.NewAnnotationBackend(ar, ds, rs), nil
}
if cfg.Backend == "loki" {
baseURL, err := url.Parse(cfg.LokiRemoteURL)

View File

@ -2,11 +2,13 @@ package historian
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/annotations"
@ -20,13 +22,19 @@ import (
type AnnotationBackend struct {
annotations annotations.Repository
dashboards *dashboardResolver
rules RuleStore
log log.Logger
}
func NewAnnotationBackend(annotations annotations.Repository, dashboards dashboards.DashboardService) *AnnotationBackend {
type RuleStore interface {
GetAlertRuleByUID(ctx context.Context, query *ngmodels.GetAlertRuleByUIDQuery) error
}
func NewAnnotationBackend(annotations annotations.Repository, dashboards dashboards.DashboardService, rules RuleStore) *AnnotationBackend {
return &AnnotationBackend{
annotations: annotations,
dashboards: newDashboardResolver(dashboards, defaultDashboardCacheExpiry),
rules: rules,
log: log.New("ngalert.state.historian"),
}
}
@ -40,6 +48,89 @@ func (h *AnnotationBackend) RecordStatesAsync(ctx context.Context, rule *ngmodel
go h.recordAnnotationsSync(ctx, panel, annotations, logger)
}
func (h *AnnotationBackend) QueryStates(ctx context.Context, query ngmodels.HistoryQuery) (*data.Frame, error) {
logger := h.log.FromContext(ctx)
if query.RuleUID == "" {
return nil, fmt.Errorf("ruleUID is required to query annotations")
}
if query.Labels != nil {
logger.Warn("Annotation state history backend does not support label queries, ignoring that filter")
}
rq := ngmodels.GetAlertRuleByUIDQuery{
UID: query.RuleUID,
OrgID: query.OrgID,
}
err := h.rules.GetAlertRuleByUID(ctx, &rq)
if err != nil {
return nil, fmt.Errorf("failed to look up the requested rule")
}
if rq.Result == nil {
return nil, fmt.Errorf("no such rule exists")
}
q := annotations.ItemQuery{
AlertId: rq.Result.ID,
OrgId: query.OrgID,
From: query.From.Unix(),
To: query.To.Unix(),
}
items, err := h.annotations.Find(ctx, &q)
if err != nil {
return nil, fmt.Errorf("failed to query annotations for state history: %w", err)
}
frame := data.NewFrame("states")
// Annotations only support querying for a single rule's history.
// Since we are guaranteed to have a single rule, we can return it as a single series.
// Also, annotations don't store labels in a strongly defined format. They are formatted into the label's text.
// We are not guaranteed that a given annotation has parseable text, so we instead use the entire text as an opaque value.
lbls := data.Labels(map[string]string{
"from": "state-history",
"ruleUID": fmt.Sprint(query.RuleUID),
})
// TODO: In the future, we probably want to have one series per unique text string, instead. For simplicity, let's just make it a new column.
//
// TODO: This is a really naive mapping that will evolve in the next couple changes.
// TODO: It will converge over time with the other implementations.
//
// We represent state history as five vectors:
// 1. `time` - when the transition happened
// 2. `text` - a text string containing metadata about the rule
// 3. `prev` - the previous state and reason
// 4. `next` - the next state and reason
// 5. `data` - a JSON string, containing the annotation's contents. analogous to item.Data
times := make([]time.Time, 0, len(items))
texts := make([]string, 0, len(items))
prevStates := make([]string, 0, len(items))
nextStates := make([]string, 0, len(items))
values := make([]string, 0, len(items))
for _, item := range items {
data, err := json.Marshal(item.Data)
if err != nil {
logger.Error("Annotation service gave an annotation with unparseable data, skipping", "id", item.Id, "err", err)
continue
}
times = append(times, time.Unix(item.Time, 0))
texts = append(texts, item.Text)
prevStates = append(prevStates, item.PrevState)
nextStates = append(nextStates, item.NewState)
values = append(values, string(data))
}
frame.Fields = append(frame.Fields, data.NewField("time", lbls, times))
frame.Fields = append(frame.Fields, data.NewField("text", lbls, texts))
frame.Fields = append(frame.Fields, data.NewField("prev", lbls, prevStates))
frame.Fields = append(frame.Fields, data.NewField("next", lbls, nextStates))
frame.Fields = append(frame.Fields, data.NewField("data", lbls, values))
return frame, nil
}
func (h *AnnotationBackend) buildAnnotations(rule *ngmodels.AlertRule, states []state.StateTransition, logger log.Logger) []annotations.Item {
items := make([]annotations.Item, 0, len(states))
for _, state := range states {

View File

@ -0,0 +1,70 @@
package historian
import (
"context"
"testing"
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/annotations/annotationstest"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/tests/fakes"
"github.com/stretchr/testify/require"
)
func TestAnnotationHistorian_Integration(t *testing.T) {
t.Run("alert annotations are queryable", func(t *testing.T) {
anns := createTestAnnotationBackendSut(t)
items := []annotations.Item{createAnnotation()}
anns.recordAnnotationsSync(context.Background(), nil, items, log.NewNopLogger())
q := models.HistoryQuery{
RuleUID: "my-rule",
OrgID: 1,
}
frame, err := anns.QueryStates(context.Background(), q)
require.NoError(t, err)
require.NotNil(t, frame)
require.Len(t, frame.Fields, 5)
for i := 0; i < 5; i++ {
require.Equal(t, frame.Fields[i].Len(), 1)
}
})
}
func createTestAnnotationBackendSut(t *testing.T) *AnnotationBackend {
t.Helper()
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
rules := fakes.NewRuleStore(t)
rules.Rules[1] = []*models.AlertRule{
models.AlertRuleGen(withOrgID(1), withUID("my-rule"))(),
}
return NewAnnotationBackend(fakeAnnoRepo, &dashboards.FakeDashboardService{}, rules)
}
func createAnnotation() annotations.Item {
return annotations.Item{
Id: 1,
OrgId: 1,
AlertId: 1,
Text: "MyAlert {a=b} - No data",
Data: simplejson.New(),
Epoch: time.Now().UnixNano() / int64(time.Millisecond),
}
}
func withOrgID(orgId int64) func(rule *models.AlertRule) {
return func(rule *models.AlertRule) {
rule.OrgID = orgId
}
}
func withUID(uid string) func(rule *models.AlertRule) {
return func(rule *models.AlertRule) {
rule.UID = uid
}
}

View File

@ -3,6 +3,7 @@ package historian
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
@ -33,3 +34,7 @@ func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, _ *models.Ale
logger := h.log.FromContext(ctx)
logger.Debug("Remote Loki state history backend was called with states")
}
func (h *RemoteLokiBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
return data.NewFrame("states"), nil
}

View File

@ -0,0 +1,15 @@
package historian
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
// Querier represents the ability to query state history.
// TODO: This package also contains implementations of this interface.
// TODO: This type should be moved to the side of the consumer, when the consumer is created in the future. We add it here temporarily to more clearly define this package's interface.
type Querier interface {
QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error)
}

View File

@ -3,6 +3,7 @@ package historian
import (
"context"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
@ -20,3 +21,7 @@ func NewSqlBackend() *SqlBackend {
func (h *SqlBackend) RecordStatesAsync(ctx context.Context, _ *models.AlertRule, _ []state.StateTransition) {
}
func (h *SqlBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
return data.NewFrame("states"), nil
}

View File

@ -219,7 +219,7 @@ func TestDashboardAnnotations(t *testing.T) {
_, dbstore := tests.SetupTestEnv(t, 1)
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
hist := historian.NewAnnotationBackend(fakeAnnoRepo, &dashboards.FakeDashboardService{})
hist := historian.NewAnnotationBackend(fakeAnnoRepo, &dashboards.FakeDashboardService{}, nil)
cfg := state.ManagerCfg{
Metrics: testMetrics.GetStateMetrics(),
ExternalURL: nil,
@ -2208,7 +2208,7 @@ func TestProcessEvalResults(t *testing.T) {
for _, tc := range testCases {
fakeAnnoRepo := annotationstest.NewFakeAnnotationsRepo()
hist := historian.NewAnnotationBackend(fakeAnnoRepo, &dashboards.FakeDashboardService{})
hist := historian.NewAnnotationBackend(fakeAnnoRepo, &dashboards.FakeDashboardService{}, nil)
cfg := state.ManagerCfg{
Metrics: testMetrics.GetStateMetrics(),
ExternalURL: nil,