Alerting/Annotations: Prevent panics from composite store jobs from crashing Grafana (#83459)

* Don't directly use pointer to json

* Don't crash entire process if a store job panics

* Add debug logs when failing to parse/handle Loki entries
This commit is contained in:
William Wernert 2024-02-28 13:16:37 -05:00 committed by GitHub
parent 9190fb28e8
commit af528d2f66
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 116 additions and 9 deletions

View File

@ -38,7 +38,7 @@ func ProvideService(
historianStore := loki.NewLokiHistorianStore(cfg.UnifiedAlerting.StateHistory, features, db, log.New("annotations.loki"))
if historianStore != nil {
l.Debug("Using composite read store")
read = NewCompositeStore(xormStore, historianStore)
read = NewCompositeStore(log.New("annotations.composite"), xormStore, historianStore)
} else {
l.Debug("Using xorm read store")
read = write

View File

@ -2,8 +2,11 @@ package annotationsimpl
import (
"context"
"fmt"
"sort"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/annotations/accesscontrol"
@ -11,20 +14,29 @@ import (
// CompositeStore is a read store that combines two or more read stores, and queries all stores in parallel.
type CompositeStore struct {
logger log.Logger
readers []readStore
}
func NewCompositeStore(readers ...readStore) *CompositeStore {
func NewCompositeStore(logger log.Logger, readers ...readStore) *CompositeStore {
return &CompositeStore{
logger: logger,
readers: readers,
}
}
// Satisfy the commonStore interface, in practice this is not used.
func (c *CompositeStore) Type() string {
return "composite"
}
// Get returns annotations from all stores, and combines the results.
func (c *CompositeStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
itemCh := make(chan []*annotations.ItemDTO, len(c.readers))
err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) error {
err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) (err error) {
defer handleJobPanic(c.logger, c.readers[i].Type(), &err)
items, err := c.readers[i].Get(ctx, query, accessResources)
itemCh <- items
return err
@ -47,7 +59,9 @@ func (c *CompositeStore) Get(ctx context.Context, query *annotations.ItemQuery,
func (c *CompositeStore) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
resCh := make(chan annotations.FindTagsResult, len(c.readers))
err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) error {
err := concurrency.ForEachJob(ctx, len(c.readers), len(c.readers), func(ctx context.Context, i int) (err error) {
defer handleJobPanic(c.logger, c.readers[i].Type(), &err)
res, err := c.readers[i].GetTags(ctx, query)
resCh <- res
return err
@ -65,3 +79,20 @@ func (c *CompositeStore) GetTags(ctx context.Context, query *annotations.TagsQue
return annotations.FindTagsResult{Tags: res}, nil
}
// handleJobPanic is a helper function that recovers from a panic in a concurrent job.,
// It will log the error and set the job error if it is not nil.
func handleJobPanic(logger log.Logger, storeType string, jobErr *error) {
if r := recover(); r != nil {
logger.Error("Annotation store panic", "error", r, "store", storeType, "stack", log.Stack(1))
errMsg := "concurrent job panic"
if jobErr != nil {
err := fmt.Errorf(errMsg)
if panicErr, ok := r.(error); ok {
err = fmt.Errorf("%s: %w", errMsg, panicErr)
}
*jobErr = err
}
}
}

View File

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/annotations"
"github.com/grafana/grafana/pkg/services/annotations/accesscontrol"
"github.com/stretchr/testify/require"
@ -18,6 +19,22 @@ var (
)
func TestCompositeStore(t *testing.T) {
t.Run("should handle panic", func(t *testing.T) {
r1 := newFakeReader()
getPanic := func(context.Context, *annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
panic("ohno")
}
r2 := newFakeReader(withGetFn(getPanic))
store := &CompositeStore{
log.NewNopLogger(),
[]readStore{r1, r2},
}
_, err := store.Get(context.Background(), nil, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "concurrent job panic")
})
t.Run("should return first error", func(t *testing.T) {
err1 := errors.New("error 1")
r1 := newFakeReader(withError(err1))
@ -25,6 +42,7 @@ func TestCompositeStore(t *testing.T) {
r2 := newFakeReader(withError(err2), withWait(10*time.Millisecond))
store := &CompositeStore{
log.NewNopLogger(),
[]readStore{r1, r2},
}
@ -64,6 +82,7 @@ func TestCompositeStore(t *testing.T) {
r2 := newFakeReader(withItems(items2))
store := &CompositeStore{
log.NewNopLogger(),
[]readStore{r1, r2},
}
@ -92,6 +111,7 @@ func TestCompositeStore(t *testing.T) {
r2 := newFakeReader(withTags(tags2))
store := &CompositeStore{
log.NewNopLogger(),
[]readStore{r1, r2},
}
@ -108,13 +128,23 @@ func TestCompositeStore(t *testing.T) {
}
type fakeReader struct {
items []*annotations.ItemDTO
tagRes annotations.FindTagsResult
wait time.Duration
err error
items []*annotations.ItemDTO
tagRes annotations.FindTagsResult
getFn func(context.Context, *annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)
getTagFn func(context.Context, *annotations.TagsQuery) (annotations.FindTagsResult, error)
wait time.Duration
err error
}
func (f *fakeReader) Type() string {
return "fake"
}
func (f *fakeReader) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
if f.getFn != nil {
return f.getFn(ctx, query, accessResources)
}
if f.wait > 0 {
time.Sleep(f.wait)
}
@ -128,6 +158,10 @@ func (f *fakeReader) Get(ctx context.Context, query *annotations.ItemQuery, acce
}
func (f *fakeReader) GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error) {
if f.getTagFn != nil {
return f.getTagFn(ctx, query)
}
if f.wait > 0 {
time.Sleep(f.wait)
}
@ -164,6 +198,12 @@ func withTags(tags []*annotations.TagsDTO) func(*fakeReader) {
}
}
func withGetFn(fn func(context.Context, *annotations.ItemQuery, *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)) func(*fakeReader) {
return func(f *fakeReader) {
f.getFn = fn
}
}
func newFakeReader(opts ...func(*fakeReader)) *fakeReader {
f := &fakeReader{}
for _, opt := range opts {

View File

@ -69,6 +69,10 @@ func NewLokiHistorianStore(cfg setting.UnifiedAlertingStateHistorySettings, ft f
}
}
func (r *LokiHistorianStore) Type() string {
return "loki"
}
func (r *LokiHistorianStore) Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error) {
if query.Type == "annotation" {
return make([]*annotations.ItemDTO, 0), nil
@ -124,6 +128,7 @@ func (r *LokiHistorianStore) annotationsFromStream(stream historian.Stream, ac a
err := json.Unmarshal([]byte(sample.V), &entry)
if err != nil {
// bad data, skip
r.log.Debug("failed to unmarshal loki entry", "error", err, "entry", sample.V)
continue
}
@ -135,6 +140,7 @@ func (r *LokiHistorianStore) annotationsFromStream(stream historian.Stream, ac a
transition, err := buildTransition(entry)
if err != nil {
// bad data, skip
r.log.Debug("failed to build transition", "error", err, "entry", entry)
continue
}
@ -207,6 +213,10 @@ type number interface {
// numericMap converts a simplejson map[string]any to a map[string]N, where N is numeric (int or float).
func numericMap[N number](j *simplejson.Json) (map[string]N, error) {
if j == nil {
return nil, fmt.Errorf("unexpected nil value")
}
m, err := j.Map()
if err != nil {
return nil, err

View File

@ -374,7 +374,23 @@ func TestHasAccess(t *testing.T) {
})
}
func TestFloat64Map(t *testing.T) {
func TestNumericMap(t *testing.T) {
t.Run("should return error for nil value", func(t *testing.T) {
var jsonMap *simplejson.Json
_, err := numericMap[float64](jsonMap)
require.Error(t, err)
require.Contains(t, err.Error(), "unexpected nil value")
})
t.Run("should return error for nil interface value", func(t *testing.T) {
jsonMap := simplejson.NewFromAny(map[string]any{
"key1": nil,
})
_, err := numericMap[float64](jsonMap)
require.Error(t, err)
require.Contains(t, err.Error(), "unexpected value type")
})
t.Run(`should convert json string:float kv to Golang map[string]float64`, func(t *testing.T) {
jsonMap := simplejson.NewFromAny(map[string]any{
"key1": json.Number("1.0"),

View File

@ -14,12 +14,18 @@ type store interface {
writeStore
}
type commonStore interface {
Type() string
}
type readStore interface {
commonStore
Get(ctx context.Context, query *annotations.ItemQuery, accessResources *accesscontrol.AccessResources) ([]*annotations.ItemDTO, error)
GetTags(ctx context.Context, query *annotations.TagsQuery) (annotations.FindTagsResult, error)
}
type writeStore interface {
commonStore
Add(ctx context.Context, items *annotations.Item) error
AddMany(ctx context.Context, items []annotations.Item) error
Update(ctx context.Context, item *annotations.Item) error

View File

@ -54,6 +54,10 @@ func NewXormStore(cfg *setting.Cfg, l log.Logger, db db.DB, tagService tag.Servi
}
}
func (r *xormRepositoryImpl) Type() string {
return "sql"
}
func (r *xormRepositoryImpl) Add(ctx context.Context, item *annotations.Item) error {
tags := tag.ParseTagPairs(item.Tags)
item.Tags = tag.JoinTagPairs(tags)