Alerting: Create new state history "fanout" backend that dispatches to multiple other backends at once (#64774)

* Rename RecordStatesAsync to Record

* Rename QueryStates to Query

* Implement fanout writes

* Implement primary queries

* Simplify error joining

* Add test for query path

* Add tests for writes and error propagation

* Allow fanout backend to be configured

* Touch up log messages and config validation

* Consistent documentation for all backend structs

* Parse and normalize backend names more consistently against an enum

* Touch-ups to documentation

* Improve clarity around multi-record blocking

* Keep primary and secondaries more distinct

* Rename fanout backend to multiple backend

* Simplify config keys for multi backend mode
This commit is contained in:
Alexander Weaver 2023-03-17 12:41:18 -05:00 committed by GitHub
parent e01a3e0ea5
commit a31672fa40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 281 additions and 28 deletions

View File

@ -14,7 +14,7 @@ import (
)
type Historian interface {
QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error)
Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error)
}
type HistorySrv struct {
@ -44,7 +44,7 @@ func (srv *HistorySrv) RouteQueryStateHistory(c *contextmodel.ReqContext) respon
To: time.Unix(to, 0),
Labels: labels,
}
frame, err := srv.hist.QueryStates(c.Req.Context(), query)
frame, err := srv.hist.Query(c.Req.Context(), query)
if err != nil {
return ErrResp(http.StatusInternalServerError, err, "")
}

View File

@ -386,11 +386,38 @@ func configureHistorianBackend(ctx context.Context, cfg setting.UnifiedAlertingS
return historian.NewNopHistorian(), nil
}
met.Info.WithLabelValues(cfg.Backend).Set(1)
if cfg.Backend == "annotations" {
backend, err := historian.ParseBackendType(cfg.Backend)
if err != nil {
return nil, err
}
met.Info.WithLabelValues(backend.String()).Set(1)
if backend == historian.BackendTypeMultiple {
primaryCfg := cfg
primaryCfg.Backend = cfg.MultiPrimary
primary, err := configureHistorianBackend(ctx, primaryCfg, ar, ds, rs, met, l)
if err != nil {
return nil, fmt.Errorf("multi-backend target \"%s\" was misconfigured: %w", cfg.MultiPrimary, err)
}
var secondaries []historian.Backend
for _, b := range cfg.MultiSecondaries {
secCfg := cfg
secCfg.Backend = b
sec, err := configureHistorianBackend(ctx, secCfg, ar, ds, rs, met, l)
if err != nil {
return nil, fmt.Errorf("multi-backend target \"%s\" was miconfigured: %w", b, err)
}
secondaries = append(secondaries, sec)
}
l.Info("State history is operating in multi-backend mode", "primary", cfg.MultiPrimary, "secondaries", cfg.MultiSecondaries)
return historian.NewMultipleBackend(primary, secondaries...), nil
}
if backend == historian.BackendTypeAnnotations {
return historian.NewAnnotationBackend(ar, ds, rs, met), nil
}
if cfg.Backend == "loki" {
if backend == historian.BackendTypeLoki {
lcfg, err := historian.NewLokiConfig(cfg)
if err != nil {
return nil, fmt.Errorf("invalid remote loki configuration: %w", err)
@ -405,9 +432,9 @@ func configureHistorianBackend(ctx context.Context, cfg setting.UnifiedAlertingS
}
return backend, nil
}
if cfg.Backend == "sql" {
if backend == historian.BackendTypeSQL {
return historian.NewSqlBackend(), nil
}
return nil, fmt.Errorf("unrecognized state history backend: %s", cfg.Backend)
return nil, fmt.Errorf("unrecognized state history backend: %s", backend)
}

View File

@ -73,6 +73,37 @@ func TestConfigureHistorianBackend(t *testing.T) {
require.ErrorContains(t, err, "unrecognized")
})
t.Run("fail initialization if invalid multi-backend primary", func(t *testing.T) {
met := metrics.NewHistorianMetrics(prometheus.NewRegistry())
logger := log.NewNopLogger()
cfg := setting.UnifiedAlertingStateHistorySettings{
Enabled: true,
Backend: "multiple",
MultiPrimary: "invalid-backend",
}
_, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger)
require.ErrorContains(t, err, "multi-backend target")
require.ErrorContains(t, err, "unrecognized")
})
t.Run("fail initialization if invalid multi-backend secondary", func(t *testing.T) {
met := metrics.NewHistorianMetrics(prometheus.NewRegistry())
logger := log.NewNopLogger()
cfg := setting.UnifiedAlertingStateHistorySettings{
Enabled: true,
Backend: "multiple",
MultiPrimary: "annotations",
MultiSecondaries: []string{"sql", "invalid-backend"},
}
_, err := configureHistorianBackend(context.Background(), cfg, nil, nil, nil, met, logger)
require.ErrorContains(t, err, "multi-backend target")
require.ErrorContains(t, err, "unrecognized")
})
t.Run("do not fail initialization if pinging Loki fails", func(t *testing.T) {
met := metrics.NewHistorianMetrics(prometheus.NewRegistry())
logger := log.NewNopLogger()

View File

@ -54,8 +54,8 @@ func NewAnnotationBackend(annotations AnnotationStore, dashboards dashboards.Das
}
}
// RecordStates writes a number of state transitions for a given rule to state history.
func (h *AnnotationBackend) RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
// Record writes a number of state transitions for a given rule to state history.
func (h *AnnotationBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
logger := h.log.FromContext(ctx)
// Build annotations before starting goroutine, to make sure all data is copied and won't mutate underneath us.
annotations := buildAnnotations(rule, states, logger)
@ -69,7 +69,8 @@ func (h *AnnotationBackend) RecordStatesAsync(ctx context.Context, rule history_
return errCh
}
func (h *AnnotationBackend) QueryStates(ctx context.Context, query ngmodels.HistoryQuery) (*data.Frame, error) {
// Query filters state history annotations and formats them into a dataframe.
func (h *AnnotationBackend) Query(ctx context.Context, query ngmodels.HistoryQuery) (*data.Frame, error) {
logger := h.log.FromContext(ctx)
if query.RuleUID == "" {
return nil, fmt.Errorf("ruleUID is required to query annotations")

View File

@ -37,7 +37,7 @@ func TestAnnotationHistorian(t *testing.T) {
RuleUID: "my-rule",
OrgID: 1,
}
frame, err := anns.QueryStates(context.Background(), q)
frame, err := anns.Query(context.Background(), q)
require.NoError(t, err)
require.NotNil(t, frame)
@ -55,7 +55,7 @@ func TestAnnotationHistorian(t *testing.T) {
Labels: data.Labels{"a": "b"},
})
err := <-anns.RecordStatesAsync(context.Background(), rule, states)
err := <-anns.Record(context.Background(), rule, states)
require.NoError(t, err)
})
@ -71,8 +71,8 @@ func TestAnnotationHistorian(t *testing.T) {
Labels: data.Labels{"a": "b"},
})
<-anns.RecordStatesAsync(context.Background(), rule, states)
<-errAnns.RecordStatesAsync(context.Background(), rule, states)
<-anns.Record(context.Background(), rule, states)
<-errAnns.Record(context.Background(), rule, states)
exp := bytes.NewBufferString(`
# HELP grafana_alerting_state_history_transitions_failed_total The total number of state transitions that failed to be written - they are not retried.

View File

@ -0,0 +1,39 @@
package historian
import (
"fmt"
"strings"
)
// BackendType identifies different kinds of state history backends.
type BackendType string
// String implements Stringer for BackendType.
func (bt BackendType) String() string {
return string(bt)
}
const (
BackendTypeAnnotations BackendType = "annotations"
BackendTypeLoki BackendType = "loki"
BackendTypeMultiple BackendType = "multiple"
BackendTypeNoop BackendType = "noop"
BackendTypeSQL BackendType = "sql"
)
func ParseBackendType(s string) (BackendType, error) {
norm := strings.ToLower(strings.TrimSpace(s))
types := map[BackendType]struct{}{
BackendTypeAnnotations: {},
BackendTypeLoki: {},
BackendTypeMultiple: {},
BackendTypeNoop: {},
BackendTypeSQL: {},
}
p := BackendType(norm)
if _, ok := types[p]; !ok {
return "", fmt.Errorf("unrecognized state history backend: %s", p)
}
return p, nil
}

View File

@ -44,6 +44,7 @@ type remoteLokiClient interface {
rangeQuery(ctx context.Context, selectors []Selector, start, end int64) (queryRes, error)
}
// RemoteLokibackend is a state.Historian that records state history to an external Loki instance.
type RemoteLokiBackend struct {
client remoteLokiClient
externalLabels map[string]string
@ -67,7 +68,8 @@ func (h *RemoteLokiBackend) TestConnection(ctx context.Context) error {
return h.client.ping(ctx)
}
func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
// Record writes a number of state transitions for a given rule to an external Loki instance.
func (h *RemoteLokiBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
logger := h.log.FromContext(ctx)
streams := statesToStreams(rule, states, h.externalLabels, logger)
errCh := make(chan error, 1)
@ -92,7 +94,8 @@ func (h *RemoteLokiBackend) RecordStatesAsync(ctx context.Context, rule history_
return errCh
}
func (h *RemoteLokiBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
// Query retrieves state history entries from an external Loki instance and formats the results into a dataframe.
func (h *RemoteLokiBackend) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
selectors, err := buildSelectors(query)
if err != nil {
return nil, fmt.Errorf("failed to build the provided selectors: %w", err)

View File

@ -43,6 +43,13 @@ func NewLokiConfig(cfg setting.UnifiedAlertingStateHistorySettings) (LokiConfig,
write = cfg.LokiRemoteURL
}
if read == "" {
return LokiConfig{}, fmt.Errorf("either read path URL or remote Loki URL must be provided")
}
if write == "" {
return LokiConfig{}, fmt.Errorf("either write path URL or remote Loki URL must be provided")
}
readURL, err := url.Parse(read)
if err != nil {
return LokiConfig{}, fmt.Errorf("failed to parse loki remote read URL: %w", err)

View File

@ -269,7 +269,7 @@ func TestRecordStates(t *testing.T) {
Labels: data.Labels{"a": "b"},
})
err := <-loki.RecordStatesAsync(context.Background(), rule, states)
err := <-loki.Record(context.Background(), rule, states)
require.NoError(t, err)
require.Contains(t, "/loki/api/v1/push", req.lastRequest.URL.Path)
@ -286,8 +286,8 @@ func TestRecordStates(t *testing.T) {
Labels: data.Labels{"a": "b"},
})
<-loki.RecordStatesAsync(context.Background(), rule, states)
<-errLoki.RecordStatesAsync(context.Background(), rule, states)
<-loki.Record(context.Background(), rule, states)
<-errLoki.Record(context.Background(), rule, states)
exp := bytes.NewBufferString(`
# HELP grafana_alerting_state_history_transitions_failed_total The total number of state transitions that failed to be written - they are not retried.

View File

@ -0,0 +1,55 @@
package historian
import (
"context"
"errors"
"github.com/grafana/grafana-plugin-sdk-go/data"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
)
type Backend interface {
Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error
Query(ctx context.Context, query ngmodels.HistoryQuery) (*data.Frame, error)
}
// MultipleBackend is a state.Historian that records history to multiple backends at once.
// Only one backend is used for reads. The backend selected for read traffic is called the primary and all others are called secondaries.
type MultipleBackend struct {
primary Backend
secondaries []Backend
}
func NewMultipleBackend(primary Backend, secondaries ...Backend) *MultipleBackend {
return &MultipleBackend{
primary: primary,
secondaries: secondaries,
}
}
func (h *MultipleBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
jobs := make([]<-chan error, 0, len(h.secondaries)+1) // One extra for the primary.
for _, b := range append([]Backend{h.primary}, h.secondaries...) {
jobs = append(jobs, b.Record(ctx, rule, states))
}
errCh := make(chan error, 1)
go func() {
defer close(errCh)
errs := make([]error, 0)
// Wait for all jobs to complete. Order doesn't matter here, as we always need to wait on the slowest job regardless.
for _, ch := range jobs {
err := <-ch
if err != nil {
errs = append(errs, err)
}
}
errCh <- errors.Join(errs...)
}()
return errCh
}
func (h *MultipleBackend) Query(ctx context.Context, query ngmodels.HistoryQuery) (*data.Frame, error) {
return h.primary.Query(ctx, query)
}

View File

@ -0,0 +1,78 @@
package historian
import (
"context"
"fmt"
"testing"
"github.com/grafana/grafana-plugin-sdk-go/data"
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
history_model "github.com/grafana/grafana/pkg/services/ngalert/state/historian/model"
"github.com/stretchr/testify/require"
)
func TestMultipleBackend(t *testing.T) {
t.Run("querying dispatches to primary", func(t *testing.T) {
one := &fakeBackend{resp: data.NewFrame("one")}
two := &fakeBackend{resp: data.NewFrame("two")}
three := &fakeBackend{resp: data.NewFrame("three")}
fan := NewMultipleBackend(one, two, three)
resp, err := fan.Query(context.Background(), ngmodels.HistoryQuery{})
require.NoError(t, err)
require.Equal(t, "one", resp.Name)
})
t.Run("writes dispatch to all", func(t *testing.T) {
one := &fakeBackend{}
two := &fakeBackend{}
three := &fakeBackend{}
fan := NewMultipleBackend(one, two, three)
rule := history_model.RuleMeta{}
vs := []state.StateTransition{{}}
err := <-fan.Record(context.Background(), rule, vs)
require.NoError(t, err)
require.NotEmpty(t, one.last)
require.NotEmpty(t, two.last)
require.NotEmpty(t, three.last)
})
t.Run("writes combine errors", func(t *testing.T) {
one := &fakeBackend{err: fmt.Errorf("error one")}
two := &fakeBackend{err: fmt.Errorf("error two")}
three := &fakeBackend{}
fan := NewMultipleBackend(one, two, three)
rule := history_model.RuleMeta{}
vs := []state.StateTransition{{}}
err := <-fan.Record(context.Background(), rule, vs)
require.Error(t, err)
require.ErrorContains(t, err, "error one")
require.ErrorContains(t, err, "error two")
})
}
type fakeBackend struct {
resp *data.Frame
err error
last []state.StateTransition
}
func (f *fakeBackend) Record(ctx context.Context, rule history_model.RuleMeta, states []state.StateTransition) <-chan error {
ch := make(chan error, 1)
if f.err != nil {
ch <- f.err
}
f.last = states
defer close(ch)
return ch
}
func (f *fakeBackend) Query(ctx context.Context, query ngmodels.HistoryQuery) (*data.Frame, error) {
return f.resp, f.err
}

View File

@ -16,12 +16,12 @@ func NewNopHistorian() *NoOpHistorian {
return &NoOpHistorian{}
}
func (f *NoOpHistorian) RecordStatesAsync(ctx context.Context, _ history_model.RuleMeta, _ []state.StateTransition) <-chan error {
func (f *NoOpHistorian) Record(ctx context.Context, _ history_model.RuleMeta, _ []state.StateTransition) <-chan error {
errCh := make(chan error)
close(errCh)
return errCh
}
func (f *NoOpHistorian) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
func (f *NoOpHistorian) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
return data.NewFrame("states"), nil
}

View File

@ -12,5 +12,5 @@ import (
// TODO: This package also contains implementations of this interface.
// TODO: This type should be moved to the side of the consumer, when the consumer is created in the future. We add it here temporarily to more clearly define this package's interface.
type Querier interface {
QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error)
Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error)
}

View File

@ -21,12 +21,12 @@ func NewSqlBackend() *SqlBackend {
}
}
func (h *SqlBackend) RecordStatesAsync(ctx context.Context, _ history_model.RuleMeta, _ []state.StateTransition) <-chan error {
func (h *SqlBackend) Record(ctx context.Context, _ history_model.RuleMeta, _ []state.StateTransition) <-chan error {
errCh := make(chan error)
close(errCh)
return errCh
}
func (h *SqlBackend) QueryStates(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
func (h *SqlBackend) Query(ctx context.Context, query models.HistoryQuery) (*data.Frame, error) {
return data.NewFrame("states"), nil
}

View File

@ -222,7 +222,7 @@ func (st *Manager) ResetStateByRuleUID(ctx context.Context, rule *ngModels.Alert
}
ruleMeta := history_model.NewRuleMeta(rule, st.log)
errCh := st.historian.RecordStatesAsync(ctx, ruleMeta, transitions)
errCh := st.historian.Record(ctx, ruleMeta, transitions)
go func() {
err := <-errCh
if err != nil {
@ -250,7 +250,7 @@ func (st *Manager) ProcessEvalResults(ctx context.Context, evaluatedAt time.Time
allChanges := append(states, staleStates...)
if st.historian != nil {
st.historian.RecordStatesAsync(ctx, history_model.NewRuleMeta(alertRule, logger), allChanges)
st.historian.Record(ctx, history_model.NewRuleMeta(alertRule, logger), allChanges)
}
return allChanges
}

View File

@ -26,7 +26,7 @@ type Historian interface {
// RecordStates writes a number of state transitions for a given rule to state history. It returns a channel that
// is closed when writing the state transitions has completed. If an error has occurred, the channel will contain a
// non-nil error.
RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []StateTransition) <-chan error
Record(ctx context.Context, rule history_model.RuleMeta, states []StateTransition) <-chan error
}
// ImageCapturer captures images.

View File

@ -65,7 +65,7 @@ type FakeHistorian struct {
StateTransitions []StateTransition
}
func (f *FakeHistorian) RecordStatesAsync(ctx context.Context, rule history_model.RuleMeta, states []StateTransition) <-chan error {
func (f *FakeHistorian) Record(ctx context.Context, rule history_model.RuleMeta, states []StateTransition) <-chan error {
f.StateTransitions = append(f.StateTransitions, states...)
errCh := make(chan error)
close(errCh)

View File

@ -110,6 +110,8 @@ type UnifiedAlertingStateHistorySettings struct {
// if one of them is set.
LokiBasicAuthPassword string
LokiBasicAuthUsername string
MultiPrimary string
MultiSecondaries []string
ExternalLabels map[string]string
}
@ -330,6 +332,8 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error {
LokiTenantID: stateHistory.Key("loki_tenant_id").MustString(""),
LokiBasicAuthUsername: stateHistory.Key("loki_basic_auth_username").MustString(""),
LokiBasicAuthPassword: stateHistory.Key("loki_basic_auth_password").MustString(""),
MultiPrimary: stateHistory.Key("primary").MustString(""),
MultiSecondaries: splitTrim(stateHistory.Key("secondaries").MustString(""), ","),
ExternalLabels: stateHistoryLabels.KeysHash(),
}
uaCfg.StateHistory = uaCfgStateHistory
@ -341,3 +345,11 @@ func (cfg *Cfg) ReadUnifiedAlertingSettings(iniFile *ini.File) error {
func GetAlertmanagerDefaultConfiguration() string {
return alertmanagerDefaultConfiguration
}
func splitTrim(s string, sep string) []string {
spl := strings.Split(s, sep)
for i := range spl {
spl[i] = strings.TrimSpace(spl[i])
}
return spl
}