stop flushing state when Grafana stops (#55504)

This commit is contained in:
Yuriy Tseretyan 2022-09-21 10:10:17 -04:00 committed by GitHub
parent 17fc5d82d3
commit 0629d3922a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 2 additions and 67 deletions

View File

@ -307,7 +307,7 @@ func (sch *schedule) schedulePeriodic(ctx context.Context) error {
// waiting for all rule evaluation routines to stop
waitErr := dispatcherGroup.Wait()
// close the state manager and flush the state
sch.stateManager.Close(ctx)
sch.stateManager.Close()
return waitErr
}
}

View File

@ -68,9 +68,8 @@ func NewManager(logger log.Logger, metrics *metrics.State, externalURL *url.URL,
return manager
}
func (st *Manager) Close(ctx context.Context) {
func (st *Manager) Close() {
st.quit <- struct{}{}
st.flushState(ctx)
}
func (st *Manager) Warm(ctx context.Context) {
@ -319,28 +318,6 @@ func (st *Manager) Put(states []*State) {
}
}
// flushState dumps the entire state to the database
func (st *Manager) flushState(ctx context.Context) {
t := st.clock.Now()
st.log.Info("flushing the state")
st.cache.mtxStates.Lock()
defer st.cache.mtxStates.Unlock()
totalStates, errorsCnt := 0, 0
for _, orgStates := range st.cache.states {
for _, ruleStates := range orgStates {
for _, state := range ruleStates {
err := st.saveState(ctx, state)
totalStates++
if err != nil {
st.log.Error("failed to save alert state", append(state.GetRuleKey().LogContext(), "labels", state.Labels.String(), "state", state.State.String(), "err", err.Error()))
errorsCnt++
}
}
}
}
st.log.Info("the state has been flushed", "total_instances", totalStates, "errors", errorsCnt, "took", st.clock.Since(t))
}
func (st *Manager) saveState(ctx context.Context, s *State) error {
cmd := ngModels.SaveAlertInstanceCommand{
RuleOrgID: s.OrgID,

View File

@ -7,7 +7,6 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/benbjohnson/clock"
@ -16,7 +15,6 @@ import (
"github.com/grafana/grafana/pkg/services/annotations/annotationstest"
"github.com/grafana/grafana/pkg/services/dashboards"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/image"
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/store"
@ -152,43 +150,3 @@ func TestIsItStale(t *testing.T) {
})
}
}
func TestClose(t *testing.T) {
instanceStore := &store.FakeInstanceStore{}
clk := clock.New()
st := NewManager(log.New("test_state_manager"), metrics.NewNGAlert(prometheus.NewPedanticRegistry()).GetStateMetrics(), nil, nil, instanceStore, &dashboards.FakeDashboardService{}, &image.NotAvailableImageService{}, clk, annotationstest.NewFakeAnnotationsRepo())
_, rules := ngmodels.GenerateUniqueAlertRules(10, ngmodels.AlertRuleGen())
for _, rule := range rules {
results := eval.GenerateResults(rand.Intn(4)+1, eval.ResultGen(eval.WithEvaluatedAt(clk.Now())))
_ = st.ProcessEvalResults(context.Background(), clk.Now(), rule, results, ngmodels.GenerateAlertLabels(rand.Intn(4), "extra_"))
}
var states []*State
for _, org := range st.cache.states {
for _, rule := range org {
for _, state := range rule {
states = append(states, state)
}
}
}
instanceStore.RecordedOps = nil
st.Close(context.Background())
t.Run("should flush the state to store", func(t *testing.T) {
savedStates := make(map[string]ngmodels.SaveAlertInstanceCommand)
for _, op := range instanceStore.RecordedOps {
switch q := op.(type) {
case ngmodels.SaveAlertInstanceCommand:
cacheId, err := q.Labels.StringKey()
require.NoError(t, err)
savedStates[cacheId] = q
}
}
require.Len(t, savedStates, len(states))
for _, s := range states {
require.Contains(t, savedStates, s.CacheId)
}
})
}