mirror of
https://github.com/grafana/grafana.git
synced 2024-11-29 04:04:00 -06:00
Alerting: Fix migration edge-case race condition for silences (#81206)
If the db already has an entry in the kvstore for the silences of an alertmanager before the migration has taken place, then it's possible that the active alertmanager will overwrite the silence file created by the migration before it has a chance to load it into memory. This should not happen normally but is possible in edge-cases. This change opts to bypass the unnecessary step of writing the silences to disk during the migration and instead write them directly to the kvstore. This avoids the race condition entirely and is more correct as we treat the database as the source of truth for AM state.
This commit is contained in:
parent
7f7ab32444
commit
e7c6e9c5c9
@ -65,8 +65,7 @@ func ProvideService(
|
||||
migrationStore: migrationStore,
|
||||
encryptionService: encryptionService,
|
||||
silences: &silenceHandler{
|
||||
dataPath: cfg.DataPath,
|
||||
createSilenceFile: openReplace,
|
||||
persistSilences: migrationStore.SetSilences,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
@ -493,7 +492,7 @@ func (ms *migrationService) migrateAllOrgs(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
err = ms.silences.createSilences(o.ID, om.log)
|
||||
err = ms.silences.createSilences(ctx, o.ID, om.log)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create silences for org %d: %w", o.ID, err)
|
||||
}
|
||||
|
@ -1,16 +1,10 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
pb "github.com/prometheus/alertmanager/silence/silencepb"
|
||||
"github.com/prometheus/common/model"
|
||||
|
||||
@ -27,9 +21,7 @@ var TimeNow = time.Now
|
||||
type silenceHandler struct {
|
||||
rulesWithErrorSilenceLabels int
|
||||
rulesWithNoDataSilenceLabels int
|
||||
createSilenceFile func(filename string) (io.WriteCloser, error)
|
||||
|
||||
dataPath string
|
||||
persistSilences func(context.Context, int64, []*pb.MeshSilence) error
|
||||
}
|
||||
|
||||
// handleSilenceLabels adds labels to the alert rule if the rule requires silence labels for error/nodata keep_state.
|
||||
@ -45,7 +37,7 @@ func (sh *silenceHandler) handleSilenceLabels(ar *models.AlertRule, parsedSettin
|
||||
}
|
||||
|
||||
// createSilences creates silences and writes them to a file.
|
||||
func (sh *silenceHandler) createSilences(orgID int64, log log.Logger) error {
|
||||
func (sh *silenceHandler) createSilences(ctx context.Context, orgID int64, log log.Logger) error {
|
||||
var silences []*pb.MeshSilence
|
||||
if sh.rulesWithErrorSilenceLabels > 0 {
|
||||
log.Info("Creating silence for rules with ExecutionErrorState = keep_state", "rules", sh.rulesWithErrorSilenceLabels)
|
||||
@ -56,9 +48,9 @@ func (sh *silenceHandler) createSilences(orgID int64, log log.Logger) error {
|
||||
silences = append(silences, noDataSilence())
|
||||
}
|
||||
if len(silences) > 0 {
|
||||
log.Debug("Writing silences file", "silences", len(silences))
|
||||
if err := sh.writeSilencesFile(orgID, silences); err != nil {
|
||||
return fmt.Errorf("write silence file: %w", err)
|
||||
log.Debug("Writing silences to kvstore", "silences", len(silences))
|
||||
if err := sh.persistSilences(ctx, orgID, silences); err != nil {
|
||||
return fmt.Errorf("write silences to kvstore: %w", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
@ -115,64 +107,3 @@ func noDataSilence() *pb.MeshSilence {
|
||||
ExpiresAt: TimeNow().AddDate(1, 0, 0), // 1 year.
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *silenceHandler) writeSilencesFile(orgId int64, silences []*pb.MeshSilence) error {
|
||||
var buf bytes.Buffer
|
||||
for _, e := range silences {
|
||||
if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
f, err := sh.createSilenceFile(silencesFileNameForOrg(sh.dataPath, orgId))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err := io.Copy(f, bytes.NewReader(buf.Bytes())); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return f.Close()
|
||||
}
|
||||
|
||||
func silencesFileNameForOrg(dataPath string, orgID int64) string {
|
||||
return filepath.Join(dataPath, "alerting", strconv.Itoa(int(orgID)), "silences")
|
||||
}
|
||||
|
||||
// replaceFile wraps a file that is moved to another filename on closing.
|
||||
type replaceFile struct {
|
||||
*os.File
|
||||
filename string
|
||||
}
|
||||
|
||||
func (f *replaceFile) Close() error {
|
||||
if err := f.File.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := f.File.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
return os.Rename(f.File.Name(), f.filename)
|
||||
}
|
||||
|
||||
// openReplace opens a new temporary file that is moved to filename on closing.
|
||||
func openReplace(filename string) (io.WriteCloser, error) {
|
||||
tmpFilename := fmt.Sprintf("%s.%x", filename, uint64(rand.Int63()))
|
||||
|
||||
if err := os.MkdirAll(filepath.Dir(tmpFilename), os.ModePerm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
//nolint:gosec
|
||||
f, err := os.Create(tmpFilename)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rf := &replaceFile{
|
||||
File: f,
|
||||
filename: filename,
|
||||
}
|
||||
return rf, nil
|
||||
}
|
||||
|
@ -1,10 +1,11 @@
|
||||
package migration
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -13,10 +14,11 @@ import (
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
pb "github.com/prometheus/alertmanager/silence/silencepb"
|
||||
"github.com/stretchr/testify/require"
|
||||
"xorm.io/xorm"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
legacymodels "github.com/grafana/grafana/pkg/services/alerting/models"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/notifier"
|
||||
)
|
||||
|
||||
func TestSilences(t *testing.T) {
|
||||
@ -79,32 +81,13 @@ func TestSilences(t *testing.T) {
|
||||
_, err = x.Insert(test.alerts)
|
||||
require.NoError(t, err)
|
||||
|
||||
cfg := setting.NewCfg()
|
||||
cfg.DataPath = "/a/b/c"
|
||||
service := NewTestMigrationService(t, sqlStore, cfg)
|
||||
|
||||
sb := stringsBuilderCloser{
|
||||
Builder: &strings.Builder{},
|
||||
}
|
||||
silenceFileAsString := func(filename string) (io.WriteCloser, error) {
|
||||
_, err := sb.WriteString(filename)
|
||||
require.NoError(t, err)
|
||||
return sb, nil
|
||||
}
|
||||
service.silences.createSilenceFile = silenceFileAsString
|
||||
service := NewTestMigrationService(t, sqlStore, nil)
|
||||
|
||||
require.NoError(t, service.migrateAllOrgs(context.Background()))
|
||||
|
||||
expectedFilename := ""
|
||||
if len(test.expectedSilences) > 0 {
|
||||
expectedFilename = cfg.DataPath + "/alerting/1/silences"
|
||||
filename := sb.String()[:len(expectedFilename)]
|
||||
require.Equal(t, expectedFilename, filename)
|
||||
}
|
||||
// Get silences from kvstore.
|
||||
st := getSilenceState(t, x, o.ID)
|
||||
|
||||
contents := sb.String()[len(expectedFilename):]
|
||||
st, err := decodeState(strings.NewReader(contents))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, st, len(test.expectedSilences))
|
||||
|
||||
silences := make([]*pb.MeshSilence, 0, len(st))
|
||||
@ -125,12 +108,19 @@ func TestSilences(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
type stringsBuilderCloser struct {
|
||||
*strings.Builder
|
||||
}
|
||||
// getSilenceState returns the silences state from the kvstore.
|
||||
func getSilenceState(t *testing.T, x *xorm.Engine, orgId int64) state {
|
||||
content := ""
|
||||
_, err := x.Table("kv_store").Where("org_id = ? AND namespace = ? AND key = ?", orgId, notifier.KVNamespace, notifier.SilencesFilename).Cols("value").Get(&content)
|
||||
require.NoError(t, err)
|
||||
|
||||
func (s stringsBuilderCloser) Close() error {
|
||||
return nil
|
||||
b, err := base64.StdEncoding.DecodeString(content)
|
||||
require.NoError(t, err)
|
||||
|
||||
st, err := decodeState(bytes.NewReader(b))
|
||||
require.NoError(t, err)
|
||||
|
||||
return st
|
||||
}
|
||||
|
||||
// state copied from prometheus-alertmanager/silence/silence.go.
|
||||
|
@ -1,7 +1,9 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@ -11,6 +13,8 @@ import (
|
||||
"strings"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/matttproud/golang_protobuf_extensions/pbutil"
|
||||
pb "github.com/prometheus/alertmanager/silence/silencepb"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/kvstore"
|
||||
@ -96,6 +100,8 @@ type WriteStore interface {
|
||||
DeleteFolders(ctx context.Context, orgID int64, uids ...string) error
|
||||
|
||||
UpdateRuleLabels(ctx context.Context, key models.AlertRuleKeyWithVersion, labels data.Labels) error
|
||||
|
||||
SetSilences(ctx context.Context, orgID int64, silences []*pb.MeshSilence) error
|
||||
}
|
||||
|
||||
type migrationStore struct {
|
||||
@ -273,6 +279,20 @@ func (ms *migrationStore) SetOrgMigrationState(ctx context.Context, orgID int64,
|
||||
return kv.Set(ctx, stateKey, string(raw))
|
||||
}
|
||||
|
||||
// SetSilences stores the given silences in the kvstore.
|
||||
func (ms *migrationStore) SetSilences(ctx context.Context, orgID int64, silences []*pb.MeshSilence) error {
|
||||
kv := kvstore.WithNamespace(ms.kv, orgID, notifier.KVNamespace)
|
||||
|
||||
var buf bytes.Buffer
|
||||
for _, e := range silences {
|
||||
if _, err := pbutil.WriteDelimited(&buf, e); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return kv.Set(ctx, notifier.SilencesFilename, base64.StdEncoding.EncodeToString(buf.Bytes()))
|
||||
}
|
||||
|
||||
// GetAlertRuleTitles returns a map of namespaceUID -> title for all alert rules in the given org and namespace uids.
|
||||
func (ms *migrationStore) GetAlertRuleTitles(ctx context.Context, orgID int64, namespaceUIDs ...string) (map[string][]string, error) {
|
||||
res := make(map[string][]string)
|
||||
|
Loading…
Reference in New Issue
Block a user