Alerting: Detach condition validator from condition evaluator (#91150)

* Detach validator from evaluator

* Drop unnecessary interface and type
This commit is contained in:
Alexander Weaver 2024-07-30 10:55:37 -05:00 committed by GitHub
parent cf55ac5813
commit 4c71cadd5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 84 additions and 46 deletions

View File

@ -72,6 +72,7 @@ type API struct {
AlertRules *provisioning.AlertRuleService
AlertsRouter *sender.AlertsRouter
EvaluatorFactory eval.EvaluatorFactory
ConditionValidator *eval.ConditionValidator
FeatureManager featuremgmt.FeatureToggles
Historian Historian
Tracer tracing.Tracer
@ -120,7 +121,7 @@ func (api *API) RegisterAPIEndpoints(m *metrics.API) {
api.DatasourceCache,
NewLotexRuler(proxy, logger),
&RulerSrv{
conditionValidator: api.EvaluatorFactory,
conditionValidator: api.ConditionValidator,
QuotaService: api.QuotaService,
store: api.RuleStore,
provenanceStore: api.ProvenanceStore,

View File

@ -20,18 +20,14 @@ import (
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/expr/classic"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
"github.com/grafana/grafana/pkg/setting"
)
var logger = log.New("ngalert.eval")
type EvaluatorFactory interface {
// Validate validates that the condition is correct. Returns nil if the condition is correct. Otherwise, error that describes the failure
Validate(ctx EvaluationContext, condition models.Condition) error
// Create builds an evaluator pipeline ready to evaluate a rule's query
Create(ctx EvaluationContext, condition models.Condition) (ConditionEvaluator, error)
}
@ -112,21 +108,18 @@ type evaluatorImpl struct {
evaluationResultLimit int
dataSourceCache datasources.CacheService
expressionService expressionBuilder
pluginsStore pluginstore.Store
}
func NewEvaluatorFactory(
cfg setting.UnifiedAlertingSettings,
datasourceCache datasources.CacheService,
expressionService *expr.Service,
pluginsStore pluginstore.Store,
) EvaluatorFactory {
return &evaluatorImpl{
evaluationTimeout: cfg.EvaluationTimeout,
evaluationResultLimit: cfg.EvaluationResultLimit,
dataSourceCache: datasourceCache,
expressionService: expressionService,
pluginsStore: pluginsStore,
}
}
@ -825,36 +818,6 @@ func (evalResults Results) AsDataFrame() data.Frame {
return *frame
}
func (e *evaluatorImpl) Validate(ctx EvaluationContext, condition models.Condition) error {
req, err := getExprRequest(ctx, condition, e.dataSourceCache, ctx.AlertingResultsReader)
if err != nil {
return err
}
for _, query := range req.Queries {
if query.DataSource == nil {
continue
}
switch expr.NodeTypeFromDatasourceUID(query.DataSource.UID) {
case expr.TypeDatasourceNode:
p, found := e.pluginsStore.Plugin(ctx.Ctx, query.DataSource.Type)
if !found { // technically this should fail earlier during datasource resolution phase.
return fmt.Errorf("datasource refID %s could not be found: %w", query.RefID, plugins.ErrPluginUnavailable)
}
if !p.Backend {
return fmt.Errorf("datasource refID %s is not a backend datasource", query.RefID)
}
case expr.TypeMLNode:
_, found := e.pluginsStore.Plugin(ctx.Ctx, query.DataSource.Type)
if !found {
return fmt.Errorf("datasource refID %s could not be found: %w", query.RefID, plugins.ErrPluginUnavailable)
}
case expr.TypeCMDNode:
}
}
_, err = e.create(condition, req)
return err
}
func (e *evaluatorImpl) Create(ctx EvaluationContext, condition models.Condition) (ConditionEvaluator, error) {
if len(condition.Data) == 0 {
return nil, errors.New("expression list is empty. must be at least 1 expression")
@ -887,5 +850,5 @@ func (e *evaluatorImpl) create(condition models.Condition, req *expr.Request) (C
}
conditions = append(conditions, node.RefID())
}
return nil, fmt.Errorf("condition %s does not exist, must be one of %v", condition.Condition, conditions)
return nil, models.ErrConditionNotExist(condition.Condition, conditions)
}

View File

@ -591,10 +591,11 @@ func TestValidate(t *testing.T) {
pluginsStore: store,
})
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest()), store)
expressions := expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest())
validator := NewConditionValidator(cacheService, expressions, store)
evalCtx := NewContext(context.Background(), u)
err := evaluator.Validate(evalCtx, condition)
err := validator.Validate(evalCtx, condition)
if testCase.error {
require.Error(t, err)
} else {
@ -709,7 +710,7 @@ func TestCreate_HysteresisCommand(t *testing.T) {
cache: cacheService,
pluginsStore: store,
})
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(featuremgmt.FlagRecoveryThreshold), nil, tracing.InitializeTracerForTest()), store)
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(featuremgmt.FlagRecoveryThreshold), nil, tracing.InitializeTracerForTest()))
evalCtx := NewContextWithPreviousResults(context.Background(), u, testCase.reader)
eval, err := evaluator.Create(evalCtx, condition)

View File

@ -0,0 +1,65 @@
package eval
import (
"fmt"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
)
type ConditionValidator struct {
dataSourceCache datasources.CacheService
pluginsStore pluginstore.Store
expressionService expressionBuilder
}
func NewConditionValidator(datasourceCache datasources.CacheService, expressionService *expr.Service, pluginsStore pluginstore.Store) *ConditionValidator {
return &ConditionValidator{
dataSourceCache: datasourceCache,
expressionService: expressionService,
pluginsStore: pluginsStore,
}
}
func (e *ConditionValidator) Validate(ctx EvaluationContext, condition models.Condition) error {
req, err := getExprRequest(ctx, condition, e.dataSourceCache, ctx.AlertingResultsReader)
if err != nil {
return err
}
for _, query := range req.Queries {
if query.DataSource == nil {
continue
}
switch expr.NodeTypeFromDatasourceUID(query.DataSource.UID) {
case expr.TypeDatasourceNode:
p, found := e.pluginsStore.Plugin(ctx.Ctx, query.DataSource.Type)
if !found { // technically this should fail earlier during datasource resolution phase.
return fmt.Errorf("datasource refID %s could not be found: %w", query.RefID, plugins.ErrPluginUnavailable)
}
if !p.Backend {
return fmt.Errorf("datasource refID %s is not a backend datasource", query.RefID)
}
case expr.TypeMLNode:
_, found := e.pluginsStore.Plugin(ctx.Ctx, query.DataSource.Type)
if !found {
return fmt.Errorf("datasource refID %s could not be found: %w", query.RefID, plugins.ErrPluginUnavailable)
}
case expr.TypeCMDNode:
}
}
pipeline, err := e.expressionService.BuildPipeline(req)
if err != nil {
return err
}
refIDs := make([]string, 0, len(pipeline))
for _, node := range pipeline {
if node.RefID() == condition.Condition {
return nil
}
refIDs = append(refIDs, node.RefID())
}
return models.ErrConditionNotExist(condition.Condition, refIDs)
}

View File

@ -1,6 +1,8 @@
package models
import (
"fmt"
"github.com/grafana/grafana/pkg/apimachinery/errutil"
)
@ -10,6 +12,7 @@ var (
MustTemplate(errAlertRuleConflictMsg, errutil.WithPublic(errAlertRuleConflictMsg))
ErrAlertRuleGroupNotFound = errutil.NotFound("alerting.alert-rule.notFound")
ErrInvalidRelativeTimeRangeBase = errutil.BadRequest("alerting.alert-rule.invalidRelativeTime").MustTemplate("Invalid alert rule query {{ .Public.RefID }}: invalid relative time range [From: {{ .Public.From }}, To: {{ .Public.To }}]")
ErrConditionNotExistBase = errutil.BadRequest("alerting.alert-rule.conditionNotExist").MustTemplate("Condition {{ .Public.Given }} does not exist, must be one of {{ .Public.Existing }}")
)
func ErrAlertRuleConflict(rule AlertRule, underlying error) error {
@ -19,3 +22,7 @@ func ErrAlertRuleConflict(rule AlertRule, underlying error) error {
func ErrInvalidRelativeTimeRange(refID string, rtr RelativeTimeRange) error {
return ErrInvalidRelativeTimeRangeBase.Build(errutil.TemplateData{Public: map[string]any{"RefID": refID, "From": rtr.From, "To": rtr.To}})
}
func ErrConditionNotExist(given string, existing []string) error {
return ErrConditionNotExistBase.Build(errutil.TemplateData{Public: map[string]any{"Given": given, "Existing": fmt.Sprintf("%v", existing)}})
}

View File

@ -343,7 +343,8 @@ func (ng *AlertNG) init() error {
ng.AlertsRouter = alertsRouter
evalFactory := eval.NewEvaluatorFactory(ng.Cfg.UnifiedAlerting, ng.DataSourceCache, ng.ExpressionService, ng.pluginsStore)
evalFactory := eval.NewEvaluatorFactory(ng.Cfg.UnifiedAlerting, ng.DataSourceCache, ng.ExpressionService)
conditionValidator := eval.NewConditionValidator(ng.DataSourceCache, ng.ExpressionService, ng.pluginsStore)
recordingWriter, err := createRecordingWriter(ng.FeatureToggles, ng.Cfg.UnifiedAlerting.RecordingRules, ng.httpClientProvider, clk, ng.tracer, ng.Metrics.GetRemoteWriterMetrics())
if err != nil {
@ -461,6 +462,7 @@ func (ng *AlertNG) init() error {
AlertRules: alertRuleService,
AlertsRouter: alertsRouter,
EvaluatorFactory: evalFactory,
ConditionValidator: conditionValidator,
FeatureManager: ng.FeatureToggles,
AppUrl: appUrl,
Historian: history,

View File

@ -29,7 +29,6 @@ import (
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/ngalert/state"
"github.com/grafana/grafana/pkg/services/ngalert/writer"
"github.com/grafana/grafana/pkg/services/pluginsintegration/pluginstore"
"github.com/grafana/grafana/pkg/setting"
)
@ -65,7 +64,7 @@ func TestProcessTicks(t *testing.T) {
}
cacheServ := &datasources.FakeCacheService{}
evaluator := eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheServ, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest()), &pluginstore.FakePluginStore{})
evaluator := eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheServ, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest()))
schedCfg := SchedulerCfg{
BaseInterval: cfg.BaseInterval,
@ -437,7 +436,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
var evaluator = evalMock
if evalMock == nil {
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, &datasources.FakeCacheService{}, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest()), &pluginstore.FakePluginStore{})
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, &datasources.FakeCacheService{}, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, featuremgmt.WithFeatures(), nil, tracing.InitializeTracerForTest()))
}
if registry == nil {