mirror of
https://github.com/grafana/grafana.git
synced 2024-11-29 04:04:00 -06:00
Alerting: Create writer interface for recording rules (#88459)
* Create writer interface for recording rules Also create fake impl + use it for stub in scheduler
This commit is contained in:
parent
4e99cd2860
commit
5de7d4d06d
@ -40,6 +40,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/state/historian"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/store"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/writer"
|
||||
"github.com/grafana/grafana/pkg/services/notifications"
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
|
||||
"github.com/grafana/grafana/pkg/services/quota"
|
||||
@ -303,6 +304,8 @@ func (ng *AlertNG) init() error {
|
||||
AlertSender: alertsRouter,
|
||||
Tracer: ng.tracer,
|
||||
Log: log.New("ngalert.scheduler"),
|
||||
//TODO: replace with real writer impl
|
||||
RecordingWriter: writer.FakeWriter{},
|
||||
}
|
||||
|
||||
// There are a set of feature toggles available that act as short-circuits for common configurations.
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/writer"
|
||||
"github.com/grafana/grafana/pkg/services/org"
|
||||
"github.com/grafana/grafana/pkg/services/user"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
@ -56,6 +57,7 @@ func newRuleFactory(
|
||||
met *metrics.Scheduler,
|
||||
logger log.Logger,
|
||||
tracer tracing.Tracer,
|
||||
recordingWriter writer.Writer,
|
||||
evalAppliedHook evalAppliedFunc,
|
||||
stopAppliedHook stopAppliedFunc,
|
||||
) ruleFactoryFunc {
|
||||
@ -70,6 +72,7 @@ func newRuleFactory(
|
||||
logger,
|
||||
met,
|
||||
tracer,
|
||||
recordingWriter,
|
||||
)
|
||||
}
|
||||
return newAlertRule(
|
||||
|
@ -765,5 +765,5 @@ func TestRuleRoutine(t *testing.T) {
|
||||
}
|
||||
|
||||
func ruleFactoryFromScheduler(sch *schedule) ruleFactory {
|
||||
return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.featureToggles, sch.metrics, sch.log, sch.tracer, sch.evalAppliedFunc, sch.stopAppliedFunc)
|
||||
return newRuleFactory(sch.appURL, sch.disableGrafanaFolder, sch.maxAttempts, sch.alertsSender, sch.stateManager, sch.evaluatorFactory, &sch.schedulableAlertRules, sch.clock, sch.featureToggles, sch.metrics, sch.log, sch.tracer, sch.recordingWriter, sch.evalAppliedFunc, sch.stopAppliedFunc)
|
||||
}
|
||||
|
@ -7,12 +7,14 @@ import (
|
||||
|
||||
"github.com/benbjohnson/clock"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/eval"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/writer"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
@ -36,9 +38,11 @@ type recordingRule struct {
|
||||
logger log.Logger
|
||||
metrics *metrics.Scheduler
|
||||
tracer tracing.Tracer
|
||||
|
||||
writer writer.Writer
|
||||
}
|
||||
|
||||
func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clock, evalFactory eval.EvaluatorFactory, ft featuremgmt.FeatureToggles, logger log.Logger, metrics *metrics.Scheduler, tracer tracing.Tracer) *recordingRule {
|
||||
func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clock, evalFactory eval.EvaluatorFactory, ft featuremgmt.FeatureToggles, logger log.Logger, metrics *metrics.Scheduler, tracer tracing.Tracer, writer writer.Writer) *recordingRule {
|
||||
ctx, stop := util.WithCancelCause(parent)
|
||||
return &recordingRule{
|
||||
ctx: ctx,
|
||||
@ -51,6 +55,7 @@ func newRecordingRule(parent context.Context, maxAttempts int64, clock clock.Clo
|
||||
logger: logger,
|
||||
metrics: metrics,
|
||||
tracer: tracer,
|
||||
writer: writer,
|
||||
}
|
||||
}
|
||||
|
||||
@ -164,10 +169,10 @@ func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logge
|
||||
evalAttemptFailures := r.metrics.EvalAttemptFailures.WithLabelValues(orgID)
|
||||
evalTotalFailures := r.metrics.EvalFailures.WithLabelValues(orgID)
|
||||
|
||||
start := r.clock.Now()
|
||||
evalStart := r.clock.Now()
|
||||
evalCtx := eval.NewContext(ctx, SchedulerUserFor(ev.rule.OrgID))
|
||||
result, err := r.buildAndExecutePipeline(ctx, evalCtx, ev, logger)
|
||||
dur := r.clock.Now().Sub(start)
|
||||
evalDur := r.clock.Now().Sub(evalStart)
|
||||
|
||||
evalAttemptTotal.Inc()
|
||||
span := trace.SpanFromContext(ctx)
|
||||
@ -184,10 +189,33 @@ func (r *recordingRule) tryEvaluation(ctx context.Context, ev *Evaluation, logge
|
||||
return fmt.Errorf("server side expressions pipeline returned an error: %w", err)
|
||||
}
|
||||
|
||||
logger.Debug("Alert rule evaluated", "results", result, "duration", dur)
|
||||
logger.Debug("Alert rule evaluated", "results", result, "duration", evalDur)
|
||||
span.AddEvent("rule evaluated", trace.WithAttributes(
|
||||
attribute.Int64("results", int64(len(result.Responses))),
|
||||
))
|
||||
|
||||
frames, err := r.frameRef(ev.rule.Record.From, result)
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "failed to extract frames from rule evaluation")
|
||||
span.RecordError(err)
|
||||
return fmt.Errorf("failed to extract frames from rule evaluation: %w", err)
|
||||
}
|
||||
|
||||
writeStart := r.clock.Now()
|
||||
err = r.writer.Write(ctx, ev.rule.Record.Metric, writeStart, frames, ev.rule.Labels)
|
||||
writeDur := r.clock.Now().Sub(writeStart)
|
||||
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, "failed to write metrics")
|
||||
span.RecordError(err)
|
||||
return fmt.Errorf("metric remote write failed: %w", err)
|
||||
}
|
||||
|
||||
logger.Debug("Metrics written", "duration", writeDur)
|
||||
span.AddEvent("metrics written", trace.WithAttributes(
|
||||
attribute.Int64("frames", int64(len(frames))),
|
||||
))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -212,3 +240,17 @@ func (r *recordingRule) evaluationDoneTestHook(ev *Evaluation) {
|
||||
|
||||
r.evalAppliedHook(ev.rule.GetKey(), ev.scheduledAt)
|
||||
}
|
||||
|
||||
func (r *recordingRule) frameRef(refID string, resp *backend.QueryDataResponse) (data.Frames, error) {
|
||||
if len(resp.Responses) == 0 {
|
||||
return nil, fmt.Errorf("no responses returned from rule evaluation")
|
||||
}
|
||||
|
||||
for ref, resp := range resp.Responses {
|
||||
if ref == refID {
|
||||
return resp.Frames, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("no response with refID %s found in rule evaluation", refID)
|
||||
}
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
models "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/writer"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/testutil"
|
||||
@ -146,7 +147,7 @@ func TestRecordingRule(t *testing.T) {
|
||||
|
||||
func blankRecordingRuleForTests(ctx context.Context) *recordingRule {
|
||||
ft := featuremgmt.WithFeatures(featuremgmt.FlagGrafanaManagedRecordingRules)
|
||||
return newRecordingRule(context.Background(), 0, nil, nil, ft, log.NewNopLogger(), nil, nil)
|
||||
return newRecordingRule(context.Background(), 0, nil, nil, ft, log.NewNopLogger(), nil, nil, writer.FakeWriter{})
|
||||
}
|
||||
|
||||
func TestRecordingRule_Integration(t *testing.T) {
|
||||
|
@ -19,6 +19,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
ngmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/writer"
|
||||
"github.com/grafana/grafana/pkg/util/ticker"
|
||||
)
|
||||
|
||||
@ -92,6 +93,8 @@ type schedule struct {
|
||||
schedulableAlertRules alertRulesRegistry
|
||||
|
||||
tracer tracing.Tracer
|
||||
|
||||
recordingWriter writer.Writer
|
||||
}
|
||||
|
||||
// SchedulerCfg is the scheduler configuration.
|
||||
@ -110,6 +113,7 @@ type SchedulerCfg struct {
|
||||
AlertSender AlertsSender
|
||||
Tracer tracing.Tracer
|
||||
Log log.Logger
|
||||
RecordingWriter writer.Writer
|
||||
}
|
||||
|
||||
// NewScheduler returns a new scheduler.
|
||||
@ -138,6 +142,7 @@ func NewScheduler(cfg SchedulerCfg, stateManager *state.Manager) *schedule {
|
||||
schedulableAlertRules: alertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.AlertRule)},
|
||||
alertsSender: cfg.AlertSender,
|
||||
tracer: cfg.Tracer,
|
||||
recordingWriter: cfg.RecordingWriter,
|
||||
}
|
||||
|
||||
return &sch
|
||||
@ -254,6 +259,7 @@ func (sch *schedule) processTick(ctx context.Context, dispatcherGroup *errgroup.
|
||||
sch.metrics,
|
||||
sch.log,
|
||||
sch.tracer,
|
||||
sch.recordingWriter,
|
||||
sch.evalAppliedFunc,
|
||||
sch.stopAppliedFunc,
|
||||
)
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/metrics"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/state"
|
||||
"github.com/grafana/grafana/pkg/services/ngalert/writer"
|
||||
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
|
||||
"github.com/grafana/grafana/pkg/setting"
|
||||
)
|
||||
@ -459,6 +460,8 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
MaxAttempts: 1,
|
||||
}
|
||||
|
||||
fakeRecordingWriter := writer.FakeWriter{}
|
||||
|
||||
schedCfg := SchedulerCfg{
|
||||
BaseInterval: cfg.BaseInterval,
|
||||
MaxAttempts: cfg.MaxAttempts,
|
||||
@ -471,6 +474,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
AlertSender: senderMock,
|
||||
Tracer: testTracer,
|
||||
Log: log.New("ngalert.scheduler"),
|
||||
RecordingWriter: fakeRecordingWriter,
|
||||
}
|
||||
managerCfg := state.ManagerCfg{
|
||||
Metrics: m.GetStateMetrics(),
|
||||
|
20
pkg/services/ngalert/writer/fake.go
Normal file
20
pkg/services/ngalert/writer/fake.go
Normal file
@ -0,0 +1,20 @@
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
)
|
||||
|
||||
type FakeWriter struct {
|
||||
WriteFunc func(ctx context.Context, name string, t time.Time, frames data.Frames, extraLabels map[string]string) error
|
||||
}
|
||||
|
||||
func (w FakeWriter) Write(ctx context.Context, name string, t time.Time, frames data.Frames, extraLabels map[string]string) error {
|
||||
if w.WriteFunc == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return w.WriteFunc(ctx, name, t, frames, extraLabels)
|
||||
}
|
12
pkg/services/ngalert/writer/writer.go
Normal file
12
pkg/services/ngalert/writer/writer.go
Normal file
@ -0,0 +1,12 @@
|
||||
package writer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
)
|
||||
|
||||
type Writer interface {
|
||||
Write(ctx context.Context, name string, t time.Time, frames data.Frames, extraLabels map[string]string) error
|
||||
}
|
Loading…
Reference in New Issue
Block a user