Alerting: Persist annotations from multidimensional rules in batches (#56575)

* Reduce piecemeal state fields

* Read data directly off state instead of rule

* Unify state and context into single struct

* Expose contextual information to layer above setNextState

* Work in terms of ContextualState and call historian in batches

* Call annotations service in batches

* Export format state and reason and remove workaround in unrelated test package

* Add new method to annotation service for batch inserting

* Fix loop variable aliasing bug caught by linter, didn't change behavior

* Incl timerange on annotation tests

* Insert one at a time if tags are present

* Point to rule from ContextualState rather than copy fields

* Build annotations and copy data prior to starting goroutine

* Rename to StateTransition

* Use new bulk-insert utility

* Remove rule from StateTransition and pass in directly to historian

* Simplify annotations logic since we have only one rule

* Fix logs and context, nilcheck, simplify method name

* Regenerate mock
This commit is contained in:
Alexander Weaver
2022-11-04 10:39:26 -05:00
committed by GitHub
parent c1ea944c79
commit cc8c1380e2
14 changed files with 284 additions and 85 deletions

View File

@@ -16,6 +16,7 @@ var (
//go:generate mockery --name Repository --structname FakeAnnotationsRepo --inpackage --filename annotations_repository_mock.go
type Repository interface {
Save(ctx context.Context, item *Item) error
SaveMany(ctx context.Context, items []Item) error
Update(ctx context.Context, item *Item) error
Find(ctx context.Context, query *ItemQuery) ([]*ItemDTO, error)
Delete(ctx context.Context, params *DeleteParams) error

View File

@@ -1,4 +1,4 @@
// Code generated by mockery v2.12.1. DO NOT EDIT.
// Code generated by mockery v2.12.0. DO NOT EDIT.
package annotations
@@ -86,6 +86,20 @@ func (_m *FakeAnnotationsRepo) Save(ctx context.Context, item *Item) error {
return r0
}
// SaveMany provides a mock function with given fields: ctx, items
func (_m *FakeAnnotationsRepo) SaveMany(ctx context.Context, items []Item) error {
ret := _m.Called(ctx, items)
var r0 error
if rf, ok := ret.Get(0).(func(context.Context, []Item) error); ok {
r0 = rf(ctx, items)
} else {
r0 = ret.Error(0)
}
return r0
}
// Update provides a mock function with given fields: ctx, item
func (_m *FakeAnnotationsRepo) Update(ctx context.Context, item *Item) error {
ret := _m.Called(ctx, item)

View File

@@ -30,6 +30,12 @@ func (r *RepositoryImpl) Save(ctx context.Context, item *annotations.Item) error
return r.store.Add(ctx, item)
}
// SaveMany inserts multiple annotations at once.
// It does not return IDs associated with created annotations. If you need this functionality, use the single-item Save instead.
func (r *RepositoryImpl) SaveMany(ctx context.Context, items []annotations.Item) error {
return r.store.AddMany(ctx, items)
}
func (r *RepositoryImpl) Update(ctx context.Context, item *annotations.Item) error {
return r.store.Update(ctx, item)
}

View File

@@ -8,7 +8,8 @@ import (
)
type store interface {
Add(ctx context.Context, item *annotations.Item) error
Add(ctx context.Context, items *annotations.Item) error
AddMany(ctx context.Context, items []annotations.Item) error
Update(ctx context.Context, item *annotations.Item) error
Get(ctx context.Context, query *annotations.ItemQuery) ([]*annotations.ItemDTO, error)
Delete(ctx context.Context, params *annotations.DeleteParams) error

View File

@@ -13,6 +13,7 @@ import (
"github.com/grafana/grafana/pkg/models"
ac "github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/sqlstore"
"github.com/grafana/grafana/pkg/services/sqlstore/permissions"
"github.com/grafana/grafana/pkg/services/sqlstore/searchstore"
"github.com/grafana/grafana/pkg/services/tag"
@@ -63,9 +64,64 @@ func (r *xormRepositoryImpl) Add(ctx context.Context, item *annotations.Item) er
if _, err := sess.Table("annotation").Insert(item); err != nil {
return err
}
return r.synchronizeTags(ctx, item)
})
}
// AddMany inserts large batches of annotations at once.
// It does not return IDs associated with created annotations, and it does not support annotations with tags. If you need this functionality, use the single-item Add instead.
// This is due to a limitation with some supported databases:
// We cannot correlate the IDs of batch-inserted records without acquiring a full table lock in MySQL.
// Annotations have no other uniquifier field, so we also cannot re-query for them after the fact.
// So, callers can only reliably use this endpoint if they don't care about returned IDs.
func (r *xormRepositoryImpl) AddMany(ctx context.Context, items []annotations.Item) error {
hasTags := make([]annotations.Item, 0)
hasNoTags := make([]annotations.Item, 0)
for i, item := range items {
tags := tag.ParseTagPairs(item.Tags)
item.Tags = tag.JoinTagPairs(tags)
item.Created = timeNow().UnixNano() / int64(time.Millisecond)
item.Updated = item.Created
if item.Epoch == 0 {
item.Epoch = item.Created
}
if err := r.validateItem(&items[i]); err != nil {
return err
}
if len(item.Tags) > 0 {
hasTags = append(hasTags, item)
} else {
hasNoTags = append(hasNoTags, item)
}
}
return r.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
// We can batch-insert every annotation with no tags. If an annotation has tags, we need the ID.
opts := sqlstore.NativeSettingsForDialect(r.db.GetDialect())
if _, err := sess.BulkInsert("annotation", hasNoTags, opts); err != nil {
return err
}
for i, item := range hasTags {
if _, err := sess.Table("annotation").Insert(item); err != nil {
return err
}
if err := r.synchronizeTags(ctx, &hasTags[i]); err != nil {
return err
}
}
return nil
})
}
func (r *xormRepositoryImpl) synchronizeTags(ctx context.Context, item *annotations.Item) error {
// Will re-use session if one has already been opened with the same ctx.
return r.db.WithDbSession(ctx, func(sess *sqlstore.DBSession) error {
if item.Tags != nil {
tags, err := r.tagService.EnsureTagsExist(ctx, tags)
tags, err := r.tagService.EnsureTagsExist(ctx, tag.ParseTagPairs(item.Tags))
if err != nil {
return err
}

View File

@@ -163,6 +163,47 @@ func TestIntegrationAnnotations(t *testing.T) {
require.Error(t, err)
require.ErrorIs(t, err, annotations.ErrBaseTagLimitExceeded)
t.Run("Can batch-insert annotations", func(t *testing.T) {
count := 10
items := make([]annotations.Item, count)
for i := 0; i < count; i++ {
items[i] = annotations.Item{
OrgId: 100,
Type: "batch",
Epoch: 12,
}
}
err := repo.AddMany(context.Background(), items)
require.NoError(t, err)
query := &annotations.ItemQuery{OrgId: 100, SignedInUser: testUser}
inserted, err := repo.Get(context.Background(), query)
require.NoError(t, err)
assert.Len(t, inserted, count)
})
t.Run("Can batch-insert annotations with tags", func(t *testing.T) {
count := 10
items := make([]annotations.Item, count)
for i := 0; i < count; i++ {
items[i] = annotations.Item{
OrgId: 101,
Type: "batch",
Epoch: 12,
}
}
items[0].Tags = []string{"type:test"}
err := repo.AddMany(context.Background(), items)
require.NoError(t, err)
query := &annotations.ItemQuery{OrgId: 101, SignedInUser: testUser}
inserted, err := repo.Get(context.Background(), query)
require.NoError(t, err)
assert.Len(t, inserted, count)
})
t.Run("Can query for annotation by id", func(t *testing.T) {
items, err := repo.Get(context.Background(), &annotations.ItemQuery{
OrgId: 1,
@@ -448,6 +489,7 @@ func TestIntegrationAnnotationListingWithRBAC(t *testing.T) {
OrgId: 1,
DashboardId: 2,
Epoch: 10,
Tags: []string{"foo:bar"},
}
err = repo.Add(context.Background(), dash2Annotation)
require.NoError(t, err)

View File

@@ -43,6 +43,21 @@ func (repo *fakeAnnotationsRepo) Save(ctx context.Context, item *annotations.Ite
item.Id = int64(len(repo.annotations) + 1)
}
repo.annotations[item.Id] = *item
return nil
}
func (repo *fakeAnnotationsRepo) SaveMany(ctx context.Context, items []annotations.Item) error {
repo.mtx.Lock()
defer repo.mtx.Unlock()
for _, i := range items {
if i.Id == 0 {
i.Id = int64(len(repo.annotations) + 1)
}
repo.annotations[i.Id] = i
}
return nil
}

View File

@@ -62,7 +62,7 @@ type DeleteParams struct {
}
type Item struct {
Id int64 `json:"id"`
Id int64 `json:"id" xorm:"pk autoincr 'id'"`
OrgId int64 `json:"orgId"`
UserId int64 `json:"userId"`
DashboardId int64 `json:"dashboardId"`

View File

@@ -60,11 +60,7 @@ func (srv PrometheusSrv) RouteGetAlertStatuses(c *models.ReqContext) response.Re
// TODO: or should we make this two fields? Using one field lets the
// frontend use the same logic for parsing text on annotations and this.
State: state.InstanceStateAndReason{
State: alertState.State,
Reason: alertState.StateReason,
}.String(),
State: state.FormatStateAndReason(alertState.State, alertState.StateReason),
ActiveAt: &startsAt,
Value: valString,
})
@@ -221,11 +217,7 @@ func (srv PrometheusSrv) toRuleGroup(groupName string, folder *models.Folder, ru
// TODO: or should we make this two fields? Using one field lets the
// frontend use the same logic for parsing text on annotations and this.
State: state.InstanceStateAndReason{
State: alertState.State,
Reason: alertState.StateReason,
}.String(),
State: state.FormatStateAndReason(alertState.State, alertState.StateReason),
ActiveAt: &activeAt,
Value: valString,
}

View File

@@ -33,45 +33,82 @@ func NewAnnotationHistorian(annotations annotations.Repository, dashboards dashb
}
}
func (h *AnnotationStateHistorian) RecordState(ctx context.Context, rule *ngmodels.AlertRule, currentState *state.State, evaluatedAt time.Time, currentData, previousData state.InstanceStateAndReason) {
logger := h.log.New(rule.GetKey().LogContext()...)
logger.Debug("Alert state changed creating annotation", "newState", currentData.String(), "oldState", previousData.String())
// RecordStates writes a number of state transitions for a given rule to state history.
func (h *AnnotationStateHistorian) RecordStates(ctx context.Context, rule *ngmodels.AlertRule, states []state.StateTransition) {
logger := h.log.FromContext(ctx)
// Build annotations before starting goroutine, to make sure all data is copied and won't mutate underneath us.
annotations := h.buildAnnotations(rule, states, logger)
panel := parsePanelKey(rule, logger)
go h.recordAnnotationsSync(ctx, panel, annotations, logger)
}
annotationText, annotationData := buildAnnotationTextAndData(rule, currentState)
item := &annotations.Item{
AlertId: rule.ID,
OrgId: rule.OrgID,
PrevState: previousData.String(),
NewState: currentData.String(),
Text: annotationText,
Data: annotationData,
Epoch: evaluatedAt.UnixNano() / int64(time.Millisecond),
func (h *AnnotationStateHistorian) buildAnnotations(rule *ngmodels.AlertRule, states []state.StateTransition, logger log.Logger) []annotations.Item {
items := make([]annotations.Item, 0, len(states))
for _, state := range states {
logger.Debug("Alert state changed creating annotation", "newState", state.Formatted(), "oldState", state.PreviousFormatted())
annotationText, annotationData := buildAnnotationTextAndData(rule, state.State)
item := annotations.Item{
AlertId: rule.ID,
OrgId: state.OrgID,
PrevState: state.PreviousFormatted(),
NewState: state.Formatted(),
Text: annotationText,
Data: annotationData,
Epoch: state.LastEvaluationTime.UnixNano() / int64(time.Millisecond),
}
items = append(items, item)
}
return items
}
dashUid, ok := rule.Annotations[ngmodels.DashboardUIDAnnotation]
// panelKey uniquely identifies a panel.
type panelKey struct {
orgID int64
dashUID string
panelID int64
}
// panelKey attempts to get the key of the panel attached to the given rule. Returns nil if the rule is not attached to a panel.
func parsePanelKey(rule *ngmodels.AlertRule, logger log.Logger) *panelKey {
dashUID, ok := rule.Annotations[ngmodels.DashboardUIDAnnotation]
if ok {
panelUid := rule.Annotations[ngmodels.PanelIDAnnotation]
panelId, err := strconv.ParseInt(panelUid, 10, 64)
panelAnno := rule.Annotations[ngmodels.PanelIDAnnotation]
panelID, err := strconv.ParseInt(panelAnno, 10, 64)
if err != nil {
logger.Error("Error parsing panelUID for alert annotation", "panelUID", panelUid, "error", err)
logger.Error("Error parsing panelUID for alert annotation", "actual", panelAnno, "error", err)
return nil
}
return &panelKey{
orgID: rule.OrgID,
dashUID: dashUID,
panelID: panelID,
}
}
return nil
}
func (h *AnnotationStateHistorian) recordAnnotationsSync(ctx context.Context, panel *panelKey, annotations []annotations.Item, logger log.Logger) {
if panel != nil {
dashID, err := h.dashboards.getID(ctx, panel.orgID, panel.dashUID)
if err != nil {
logger.Error("Error getting dashboard for alert annotation", "dashboardUID", panel.dashUID, "error", err)
return
}
dashID, err := h.dashboards.getID(ctx, rule.OrgID, dashUid)
if err != nil {
logger.Error("Error getting dashboard for alert annotation", "dashboardUID", dashUid, "error", err)
return
for _, i := range annotations {
i.DashboardId = dashID
i.PanelId = panel.panelID
}
item.PanelId = panelId
item.DashboardId = dashID
}
if err := h.annotations.Save(ctx, item); err != nil {
logger.Error("Error saving alert annotation", "error", err)
return
if err := h.annotations.SaveMany(ctx, annotations); err != nil {
logger.Error("Error saving alert annotation batch", "error", err)
}
logger.Debug("Done saving alert annotation batch")
}
func buildAnnotationTextAndData(rule *ngmodels.AlertRule, currentState *state.State) (string, *simplejson.Json) {

View File

@@ -2,7 +2,6 @@ package state
import (
"context"
"fmt"
"net/url"
"time"
@@ -169,23 +168,40 @@ func (st *Manager) ResetStateByRuleUID(ctx context.Context, ruleKey ngModels.Ale
func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, results eval.Results, extraLabels data.Labels) []*State {
logger := st.log.FromContext(ctx)
logger.Debug("State manager processing evaluation results", "resultCount", len(results))
var states []*State
var states []StateTransition
processedResults := make(map[string]*State, len(results))
for _, result := range results {
s := st.setNextState(ctx, alertRule, result, extraLabels, logger)
states = append(states, s)
processedResults[s.CacheID] = s
processedResults[s.State.CacheID] = s.State
}
resolvedStates := st.staleResultsHandler(ctx, evaluatedAt, alertRule, processedResults, logger)
if len(states) > 0 && st.instanceStore != nil {
logger.Debug("Saving new states to the database", "count", len(states))
_, _ = st.saveAlertStates(ctx, states...)
}
return append(states, resolvedStates...)
changedStates := make([]StateTransition, 0, len(states))
for _, s := range states {
if s.changed() {
changedStates = append(changedStates, s)
}
}
if st.historian != nil {
st.historian.RecordStates(ctx, alertRule, changedStates)
}
deltas := append(states, resolvedStates...)
nextStates := make([]*State, 0, len(states))
for _, s := range deltas {
nextStates = append(nextStates, s.State)
}
return nextStates
}
// Set the current state based on evaluation results
func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, logger log.Logger) *State {
func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRule, result eval.Result, extraLabels data.Labels, logger log.Logger) StateTransition {
currentState := st.cache.getOrCreate(ctx, st.log, alertRule, result, extraLabels, st.externalURL)
currentState.LastEvaluationTime = result.EvaluatedAt
@@ -241,13 +257,13 @@ func (st *Manager) setNextState(ctx context.Context, alertRule *ngModels.AlertRu
st.cache.set(currentState)
shouldUpdateAnnotation := oldState != currentState.State || oldReason != currentState.StateReason
if shouldUpdateAnnotation && st.historian != nil {
go st.historian.RecordState(ctx, alertRule, currentState, result.EvaluatedAt,
InstanceStateAndReason{State: currentState.State, Reason: currentState.StateReason},
InstanceStateAndReason{State: oldState, Reason: oldReason})
nextState := StateTransition{
State: currentState,
PreviousState: oldState,
PreviousStateReason: oldReason,
}
return currentState
return nextState
}
func (st *Manager) GetAll(orgID int64) []*State {
@@ -283,12 +299,13 @@ func (st *Manager) Put(states []*State) {
}
// TODO: Is the `State` type necessary? Should it embed the instance?
func (st *Manager) saveAlertStates(ctx context.Context, states ...*State) (saved, failed int) {
func (st *Manager) saveAlertStates(ctx context.Context, states ...StateTransition) (saved, failed int) {
logger := st.log.FromContext(ctx)
if st.instanceStore == nil {
return 0, 0
}
st.log.Debug("Saving alert states", "count", len(states))
logger.Debug("Saving alert states", "count", len(states))
instances := make([]ngModels.AlertInstance, 0, len(states))
type debugInfo struct {
@@ -303,8 +320,8 @@ func (st *Manager) saveAlertStates(ctx context.Context, states ...*State) (saved
labels := ngModels.InstanceLabels(s.Labels)
_, hash, err := labels.StringAndHash()
if err != nil {
debug = append(debug, debugInfo{s.OrgID, s.AlertRuleUID, s.State.String(), s.Labels.String()})
st.log.Error("Failed to save alert instance with invalid labels", "orgID", s.OrgID, "rule", s.AlertRuleUID, "error", err)
debug = append(debug, debugInfo{s.OrgID, s.AlertRuleUID, s.State.State.String(), s.Labels.String()})
logger.Error("Failed to save alert instance with invalid labels", "error", err)
continue
}
fields := ngModels.AlertInstance{
@@ -314,7 +331,7 @@ func (st *Manager) saveAlertStates(ctx context.Context, states ...*State) (saved
LabelsHash: hash,
},
Labels: ngModels.InstanceLabels(s.Labels),
CurrentState: ngModels.InstanceStateType(s.State.String()),
CurrentState: ngModels.InstanceStateType(s.State.State.String()),
CurrentReason: s.StateReason,
LastEvalTime: s.LastEvaluationTime,
CurrentStateSince: s.StartsAt,
@@ -327,7 +344,7 @@ func (st *Manager) saveAlertStates(ctx context.Context, states ...*State) (saved
for _, inst := range instances {
debug = append(debug, debugInfo{inst.RuleOrgID, inst.RuleUID, string(inst.CurrentState), data.Labels(inst.Labels).String()})
}
st.log.Error("Failed to save alert states", "states", debug, "error", err)
logger.Error("Failed to save alert states", "states", debug, "error", err)
return 0, len(debug)
}
@@ -346,26 +363,12 @@ func translateInstanceState(state ngModels.InstanceStateType) eval.State {
}
}
// This struct provides grouping of state with reason, and string formatting.
type InstanceStateAndReason struct {
State eval.State
Reason string
}
func (i InstanceStateAndReason) String() string {
s := fmt.Sprintf("%v", i.State)
if len(i.Reason) > 0 {
s += fmt.Sprintf(" (%v)", i.Reason)
}
return s
}
func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, states map[string]*State, logger log.Logger) []*State {
func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Time, alertRule *ngModels.AlertRule, states map[string]*State, logger log.Logger) []StateTransition {
// If we are removing two or more stale series it makes sense to share the resolved image as the alert rule is the same.
// TODO: We will need to change this when we support images without screenshots as each series will have a different image
var resolvedImage *ngModels.Image
var resolvedStates []*State
var resolvedStates []StateTransition
allStates := st.GetStatesForRuleUID(alertRule.OrgID, alertRule.UID)
toDelete := make([]ngModels.AlertInstanceKey, 0)
@@ -383,16 +386,18 @@ func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Tim
toDelete = append(toDelete, ngModels.AlertInstanceKey{RuleOrgID: s.OrgID, RuleUID: s.AlertRuleUID, LabelsHash: labelsHash})
if s.State == eval.Alerting {
previousState := InstanceStateAndReason{State: s.State, Reason: s.StateReason}
oldState := s.State
oldReason := s.StateReason
s.State = eval.Normal
s.StateReason = ngModels.StateReasonMissingSeries
s.EndsAt = evaluatedAt
s.Resolved = true
if st.historian != nil {
st.historian.RecordState(ctx, alertRule, s, evaluatedAt,
InstanceStateAndReason{State: eval.Normal, Reason: s.StateReason},
previousState,
)
s.LastEvaluationTime = evaluatedAt
record := StateTransition{
State: s,
PreviousState: oldState,
PreviousStateReason: oldReason,
}
// If there is no resolved image for this rule then take one
@@ -408,11 +413,15 @@ func (st *Manager) staleResultsHandler(ctx context.Context, evaluatedAt time.Tim
}
}
s.Image = resolvedImage
resolvedStates = append(resolvedStates, s)
resolvedStates = append(resolvedStates, record)
}
}
}
if st.historian != nil {
st.historian.RecordStates(ctx, alertRule, resolvedStates)
}
if st.instanceStore != nil {
if err := st.instanceStore.DeleteAlertInstances(ctx, toDelete...); err != nil {
logger.Error("Unable to delete stale instances from database", "error", err, "count", len(toDelete))

View File

@@ -2,7 +2,6 @@ package state
import (
"context"
"time"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
@@ -23,5 +22,6 @@ type RuleReader interface {
// Historian maintains an audit log of alert state history.
type Historian interface {
RecordState(ctx context.Context, rule *models.AlertRule, currentState *State, evaluatedAt time.Time, currentData, previousData InstanceStateAndReason)
// RecordStates writes a number of state transitions for a given rule to state history.
RecordStates(ctx context.Context, rule *models.AlertRule, states []StateTransition)
}

View File

@@ -73,6 +73,25 @@ func (a *State) GetRuleKey() models.AlertRuleKey {
}
}
// StateTransition describes the transition from one state to another.
type StateTransition struct {
*State
PreviousState eval.State
PreviousStateReason string
}
func (c StateTransition) Formatted() string {
return FormatStateAndReason(c.State.State, c.State.StateReason)
}
func (c StateTransition) PreviousFormatted() string {
return FormatStateAndReason(c.PreviousState, c.PreviousStateReason)
}
func (c StateTransition) changed() bool {
return c.PreviousState != c.State.State || c.PreviousStateReason != c.State.StateReason
}
type Evaluation struct {
EvaluationTime time.Time
EvaluationState eval.State
@@ -311,3 +330,11 @@ func takeImage(ctx context.Context, s image.ImageService, r *models.AlertRule) (
}
return img, nil
}
func FormatStateAndReason(state eval.State, reason string) string {
s := fmt.Sprintf("%v", state)
if len(reason) > 0 {
s += fmt.Sprintf(" (%v)", reason)
}
return s
}

View File

@@ -3,7 +3,6 @@ package state
import (
"context"
"sync"
"time"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
@@ -49,5 +48,5 @@ func (f *FakeRuleReader) ListAlertRules(_ context.Context, q *models.ListAlertRu
type FakeHistorian struct{}
func (f *FakeHistorian) RecordState(ctx context.Context, rule *models.AlertRule, currentState *State, evaluatedAt time.Time, currentData, previousData InstanceStateAndReason) {
func (f *FakeHistorian) RecordStates(ctx context.Context, rule *models.AlertRule, states []StateTransition) {
}