Alerting: Add support for tracing to alerting scheduler (#61057)

This commit is contained in:
Yuri Tseretyan 2023-01-06 21:21:43 -05:00 committed by GitHub
parent a3e341f24b
commit 48f1db63ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 60 additions and 5 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/kvstore"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/accesscontrol"
"github.com/grafana/grafana/pkg/services/annotations"
@ -64,6 +65,7 @@ func ProvideService(
accesscontrolService accesscontrol.Service,
annotationsRepo annotations.Repository,
pluginsStore plugins.Store,
tracer tracing.Tracer,
) (*AlertNG, error) {
ng := &AlertNG{
Cfg: cfg,
@ -88,6 +90,7 @@ func ProvideService(
accesscontrolService: accesscontrolService,
annotationsRepo: annotationsRepo,
pluginsStore: pluginsStore,
tracer: tracer,
}
if ng.IsDisabled() {
@ -134,6 +137,7 @@ type AlertNG struct {
bus bus.Bus
pluginsStore plugins.Store
tracer tracing.Tracer
}
func (ng *AlertNG) init() error {
@ -198,6 +202,7 @@ func (ng *AlertNG) init() error {
RuleStore: store,
Metrics: ng.Metrics.GetSchedulerMetrics(),
AlertSender: alertsRouter,
Tracer: ng.tracer,
}
history, err := configureHistorianBackend(ng.Cfg.UnifiedAlerting.StateHistory, ng.annotationsRepo, ng.dashboardService)

View File

@ -7,9 +7,12 @@ import (
"net/url"
"time"
"github.com/hashicorp/go-multierror"
prometheusModel "github.com/prometheus/common/model"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
@ -93,6 +96,8 @@ type schedule struct {
// current tick depends on its evaluation interval and when it was
// last evaluated.
schedulableAlertRules alertRulesRegistry
tracer tracing.Tracer
}
// SchedulerCfg is the scheduler configuration.
@ -107,6 +112,7 @@ type SchedulerCfg struct {
RuleStore RulesStore
Metrics *metrics.Scheduler
AlertSender AlertsSender
Tracer tracing.Tracer
}
// NewScheduler returns a new schedule.
@ -126,6 +132,7 @@ func NewScheduler(cfg SchedulerCfg, stateManager *state.Manager) *schedule {
minRuleInterval: cfg.MinRuleInterval,
schedulableAlertRules: alertRulesRegistry{rules: make(map[ngmodels.AlertRuleKey]*ngmodels.AlertRule)},
alertsSender: cfg.AlertSender,
tracer: cfg.Tracer,
}
return &sch
@ -323,7 +330,7 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
}
}
evaluate := func(ctx context.Context, attempt int64, e *evaluation) {
evaluate := func(ctx context.Context, attempt int64, e *evaluation, span tracing.Span) {
logger := logger.New("version", e.rule.Version, "attempt", attempt, "now", e.scheduledAt)
start := sch.clock.Now()
@ -363,8 +370,28 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
if results == nil {
results = append(results, eval.NewResultFromError(err, e.scheduledAt, dur))
}
if err == nil {
for _, result := range results {
if result.Error != nil {
err = multierror.Append(err, result.Error)
}
}
}
span.RecordError(err)
span.AddEvents(
[]string{"error", "message"},
[]tracing.EventValue{
{Str: fmt.Sprintf("%v", err)},
{Str: "rule evaluation failed"},
})
} else {
logger.Debug("Alert rule evaluated", "results", results, "duration", dur)
span.AddEvents(
[]string{"message", "results"},
[]tracing.EventValue{
{Str: "rule evaluated"},
{Num: int64(len(results))},
})
}
if ctx.Err() != nil { // check if the context is not cancelled. The evaluation can be a long-running task.
logger.Debug("Skip updating the state because the context has been cancelled")
@ -372,6 +399,13 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
}
processedStates := sch.stateManager.ProcessEvalResults(ctx, e.scheduledAt, e.rule, results, sch.getRuleExtraLabels(e))
alerts := FromStateTransitionToPostableAlerts(processedStates, sch.stateManager, sch.appURL)
span.AddEvents(
[]string{"message", "state_transitions", "alerts_to_send"},
[]tracing.EventValue{
{Str: "results processed"},
{Num: int64(len(processedStates))},
{Num: int64(len(alerts.PostableAlerts))},
})
if len(alerts.PostableAlerts) > 0 {
sch.alertsSender.Send(key, alerts)
}
@ -434,7 +468,16 @@ func (sch *schedule) ruleRoutine(grafanaCtx context.Context, key ngmodels.AlertR
}
currentRuleVersion = newVersion
}
evaluate(grafanaCtx, attempt, ctx)
tracingCtx, span := sch.tracer.Start(grafanaCtx, "alert rule execution")
defer span.End()
span.SetAttributes("rule_uid", ctx.rule.UID, attribute.String("rule_uid", ctx.rule.UID))
span.SetAttributes("org_id", ctx.rule.OrgID, attribute.Int64("org_id", ctx.rule.OrgID))
span.SetAttributes("rule_version", ctx.rule.Version, attribute.Int64("rule_version", ctx.rule.Version))
utcTick := ctx.scheduledAt.UTC().Format(time.RFC3339Nano)
span.SetAttributes("tick", utcTick, attribute.String("tick", utcTick))
evaluate(tracingCtx, attempt, ctx, span)
return nil
})
if err != nil {

View File

@ -21,6 +21,7 @@ import (
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/ngalert/api/tooling/definitions"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
@ -37,6 +38,7 @@ type evalAppliedInfo struct {
}
func TestProcessTicks(t *testing.T) {
testTracer := tracing.InitializeTracerForTest()
testMetrics := metrics.NewNGAlert(prometheus.NewPedanticRegistry())
ctx := context.Background()
dispatcherGroup, ctx := errgroup.WithContext(ctx)
@ -67,6 +69,7 @@ func TestProcessTicks(t *testing.T) {
RuleStore: ruleStore,
Metrics: testMetrics.GetSchedulerMetrics(),
AlertSender: notifier,
Tracer: testTracer,
}
st := state.NewManager(testMetrics.GetStateMetrics(), nil, nil, &state.NoopImageService{}, mockedClock, &state.FakeHistorian{})
@ -639,6 +642,7 @@ func TestSchedule_DeleteAlertRule(t *testing.T) {
func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStore, registry *prometheus.Registry, senderMock *AlertsSenderMock, evalMock eval.EvaluatorFactory) *schedule {
t.Helper()
testTracer := tracing.InitializeTracerForTest()
mockedClock := clock.NewMock()
@ -684,6 +688,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
RuleStore: rs,
Metrics: m.GetSchedulerMetrics(),
AlertSender: senderMock,
Tracer: testTracer,
}
st := state.NewManager(m.GetStateMetrics(), nil, is, &state.NoopImageService{}, mockedClock, &state.FakeHistorian{})

View File

@ -95,12 +95,13 @@ func SetupTestEnv(tb testing.TB, baseInterval time.Duration) (*ngalert.AlertNG,
features, folderPermissions, dashboardPermissions, ac,
)
bus := bus.ProvideBus(tracing.InitializeTracerForTest())
tracer := tracing.InitializeTracerForTest()
bus := bus.ProvideBus(tracer)
folderService := folderimpl.ProvideService(ac, bus, cfg, dashboardService, dashboardStore, nil, features, folderPermissions, nil)
ng, err := ngalert.ProvideService(
cfg, &FakeFeatures{}, nil, nil, routing.NewRouteRegister(), sqlStore, nil, nil, nil, quotatest.New(false, nil),
secretsService, nil, m, folderService, ac, &dashboards.FakeDashboardService{}, nil, bus, ac, annotationstest.NewFakeAnnotationsRepo(), &plugins.FakePluginStore{},
secretsService, nil, m, folderService, ac, &dashboards.FakeDashboardService{}, nil, bus, ac, annotationstest.NewFakeAnnotationsRepo(), &plugins.FakePluginStore{}, tracer,
)
require.NoError(tb, err)
return ng, &store.DBstore{

View File

@ -465,6 +465,7 @@ func getQuotaBySrvTargetScope(t *testing.T, quotaService quota.Service, srv quot
}
func setupEnv(t *testing.T, sqlStore *sqlstore.SQLStore, b bus.Bus, quotaService quota.Service) {
tracer := tracing.InitializeTracerForTest()
_, err := apikeyimpl.ProvideService(sqlStore, sqlStore.Cfg, quotaService)
require.NoError(t, err)
_, err = authimpl.ProvideUserAuthTokenService(sqlStore, nil, nil, featuremgmt.WithFeatures(), quotaService, sqlStore.Cfg)
@ -478,7 +479,7 @@ func setupEnv(t *testing.T, sqlStore *sqlstore.SQLStore, b bus.Bus, quotaService
m := metrics.NewNGAlert(prometheus.NewRegistry())
_, err = ngalert.ProvideService(
sqlStore.Cfg, &ngalerttests.FakeFeatures{}, nil, nil, routing.NewRouteRegister(), sqlStore, nil, nil, nil, quotaService,
secretsService, nil, m, &foldertest.FakeService{}, &acmock.Mock{}, &dashboards.FakeDashboardService{}, nil, b, &acmock.Mock{}, annotationstest.NewFakeAnnotationsRepo(), &plugins.FakePluginStore{},
secretsService, nil, m, &foldertest.FakeService{}, &acmock.Mock{}, &dashboards.FakeDashboardService{}, nil, b, &acmock.Mock{}, annotationstest.NewFakeAnnotationsRepo(), &plugins.FakePluginStore{}, tracer,
)
require.NoError(t, err)
_, err = storesrv.ProvideService(sqlStore, featuremgmt.WithFeatures(), sqlStore.Cfg, quotaService, storesrv.ProvideSystemUsersService())