Alerting: Condition evaluator with cached pipeline (#57479)

* create rule evaluator
* load header from the context
* init one factory
* update scheduler
This commit is contained in:
Yuriy Tseretyan
2022-11-02 10:13:39 -04:00
committed by GitHub
parent d9c40ca41e
commit e3a4bde622
13 changed files with 371 additions and 340 deletions

View File

@@ -2,42 +2,19 @@ package eval
import (
"context"
"time"
"github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/services/user"
)
// EvaluationContext represents the context in which a condition is evaluated.
type EvaluationContext struct {
Ctx context.Context
User *user.SignedInUser
At time.Time
RuleUID string
Ctx context.Context
User *user.SignedInUser
}
func Context(ctx context.Context, user *user.SignedInUser) EvaluationContext {
return EvaluationContext{
Ctx: ctx,
User: user,
At: time.Now(),
}
}
func (c EvaluationContext) When(t time.Time) EvaluationContext {
c.At = t
return c
}
func (c EvaluationContext) WithRule(r *models.AlertRule) EvaluationContext {
if r != nil {
c.RuleUID = r.UID
}
return c
}
func (c EvaluationContext) WithTimeout(timeout time.Duration) (EvaluationContext, context.CancelFunc) {
timeoutCtx, cancel := context.WithTimeout(c.Ctx, timeout)
c.Ctx = timeoutCtx
return c, cancel
}

View File

@@ -3,6 +3,7 @@
package eval
import (
"context"
"errors"
"fmt"
"runtime/debug"
@@ -24,28 +25,72 @@ import (
var logger = log.New("ngalert.eval")
//go:generate mockery --name Evaluator --structname FakeEvaluator --inpackage --filename evaluator_mock.go --with-expecter
type Evaluator interface {
// ConditionEval executes conditions and evaluates the result.
ConditionEval(ctx EvaluationContext, condition models.Condition) Results
// QueriesAndExpressionsEval executes queries and expressions and returns the result.
QueriesAndExpressionsEval(ctx EvaluationContext, data []models.AlertQuery) (*backend.QueryDataResponse, error)
type EvaluatorFactory interface {
// Validate validates that the condition is correct. Returns nil if the condition is correct. Otherwise, error that describes the failure
Validate(ctx EvaluationContext, condition models.Condition) error
// BuildRuleEvaluator build an evaluator pipeline ready to evaluate a rule's query
Create(ctx EvaluationContext, condition models.Condition) (ConditionEvaluator, error)
}
//go:generate mockery --name ConditionEvaluator --structname ConditionEvaluatorMock --with-expecter --output eval_mocks --outpkg eval_mocks
type ConditionEvaluator interface {
// EvaluateRaw evaluates the condition and returns raw backend response backend.QueryDataResponse
EvaluateRaw(ctx context.Context, now time.Time) (resp *backend.QueryDataResponse, err error)
// Evaluate evaluates the condition and converts the response to Results
Evaluate(ctx context.Context, now time.Time) (Results, error)
}
type conditionEvaluator struct {
pipeline expr.DataPipeline
expressionService *expr.Service
condition models.Condition
evalTimeout time.Duration
}
func (r *conditionEvaluator) EvaluateRaw(ctx context.Context, now time.Time) (resp *backend.QueryDataResponse, err error) {
defer func() {
if e := recover(); e != nil {
logger.FromContext(ctx).Error("alert rule panic", "error", e, "stack", string(debug.Stack()))
panicErr := fmt.Errorf("alert rule panic; please check the logs for the full stack")
if err != nil {
err = fmt.Errorf("queries and expressions execution failed: %w; %v", err, panicErr.Error())
} else {
err = panicErr
}
}
}()
execCtx := ctx
if r.evalTimeout <= 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, r.evalTimeout)
defer cancel()
execCtx = timeoutCtx
}
return r.expressionService.ExecutePipeline(execCtx, now, r.pipeline)
}
// Evaluate evaluates the condition and converts the response to Results
func (r *conditionEvaluator) Evaluate(ctx context.Context, now time.Time) (Results, error) {
response, err := r.EvaluateRaw(ctx, now)
if err != nil {
return nil, err
}
execResults := queryDataResponseToExecutionResults(r.condition, response)
return evaluateExecutionResult(execResults, now), nil
}
type evaluatorImpl struct {
cfg *setting.Cfg
evaluationTimeout time.Duration
dataSourceCache datasources.CacheService
expressionService *expr.Service
}
func NewEvaluator(
cfg *setting.Cfg,
func NewEvaluatorFactory(
cfg setting.UnifiedAlertingSettings,
datasourceCache datasources.CacheService,
expressionService *expr.Service) Evaluator {
expressionService *expr.Service) EvaluatorFactory {
return &evaluatorImpl{
cfg: cfg,
evaluationTimeout: cfg.EvaluationTimeout,
dataSourceCache: datasourceCache,
expressionService: expressionService,
}
@@ -122,6 +167,15 @@ type Result struct {
EvaluationString string
}
func NewResultFromError(err error, evaluatedAt time.Time, duration time.Duration) Result {
return Result{
State: Error,
Error: err,
EvaluatedAt: evaluatedAt,
EvaluationDuration: duration,
}
}
// State is an enum of the evaluation State for an alert instance.
type State int
@@ -169,8 +223,9 @@ func buildDatasourceHeaders(ctx EvaluationContext) map[string]string {
"X-Cache-Skip": "true",
}
if ctx.RuleUID != "" {
headers["X-Rule-Uid"] = ctx.RuleUID
key, ok := models.RuleKeyFromContext(ctx.Ctx)
if ok {
headers["X-Rule-Uid"] = key.UID
}
return headers
@@ -312,27 +367,6 @@ func queryDataResponseToExecutionResults(c models.Condition, execResp *backend.Q
return result
}
func executeQueriesAndExpressions(ctx EvaluationContext, data []models.AlertQuery, exprService *expr.Service, dsCacheService datasources.CacheService) (resp *backend.QueryDataResponse, err error) {
defer func() {
if e := recover(); e != nil {
logger.FromContext(ctx.Ctx).Error("alert rule panic", "error", e, "stack", string(debug.Stack()))
panicErr := fmt.Errorf("alert rule panic; please check the logs for the full stack")
if err != nil {
err = fmt.Errorf("queries and expressions execution failed: %w; %v", err, panicErr.Error())
} else {
err = panicErr
}
}
}()
queryDataReq, err := getExprRequest(ctx, data, dsCacheService)
if err != nil {
return nil, err
}
return exprService.TransformData(ctx.Ctx, ctx.At, queryDataReq)
}
// datasourceUIDsToRefIDs returns a sorted slice of Ref IDs for each Datasource UID.
//
// If refIDsToDatasourceUIDs is nil then this function also returns nil. Likewise,
@@ -399,12 +433,7 @@ func evaluateExecutionResult(execResults ExecutionResults, ts time.Time) Results
evalResults := make([]Result, 0)
appendErrRes := func(e error) {
evalResults = append(evalResults, Result{
State: Error,
Error: e,
EvaluatedAt: ts,
EvaluationDuration: time.Since(ts),
})
evalResults = append(evalResults, NewResultFromError(e, ts, time.Since(ts)))
}
appendNoData := func(labels data.Labels) {
@@ -560,55 +589,37 @@ func (evalResults Results) AsDataFrame() data.Frame {
return *frame
}
// ConditionEval executes conditions and evaluates the result.
func (e *evaluatorImpl) ConditionEval(ctx EvaluationContext, condition models.Condition) Results {
execResp, err := e.QueriesAndExpressionsEval(ctx, condition.Data)
var execResults ExecutionResults
if err != nil {
execResults = ExecutionResults{Error: err}
} else {
execResults = queryDataResponseToExecutionResults(condition, execResp)
}
return evaluateExecutionResult(execResults, ctx.At)
}
// QueriesAndExpressionsEval executes queries and expressions and returns the result.
func (e *evaluatorImpl) QueriesAndExpressionsEval(ctx EvaluationContext, data []models.AlertQuery) (*backend.QueryDataResponse, error) {
timeoutCtx, cancelFn := ctx.WithTimeout(e.cfg.UnifiedAlerting.EvaluationTimeout)
defer cancelFn()
execResult, err := executeQueriesAndExpressions(timeoutCtx, data, e.expressionService, e.dataSourceCache)
if err != nil {
return nil, fmt.Errorf("failed to execute conditions: %w", err)
}
return execResult, nil
}
func (e *evaluatorImpl) Validate(ctx EvaluationContext, condition models.Condition) error {
_, err := e.Create(ctx, condition)
return err
}
func (e *evaluatorImpl) Create(ctx EvaluationContext, condition models.Condition) (ConditionEvaluator, error) {
if len(condition.Data) == 0 {
return errors.New("expression list is empty. must be at least 1 expression")
return nil, errors.New("expression list is empty. must be at least 1 expression")
}
if len(condition.Condition) == 0 {
return errors.New("condition must not be empty")
return nil, errors.New("condition must not be empty")
}
ctx.At = time.Now()
req, err := getExprRequest(ctx, condition.Data, e.dataSourceCache)
if err != nil {
return err
return nil, err
}
pipeline, err := e.expressionService.BuildPipeline(req)
if err != nil {
return err
return nil, err
}
conditions := make([]string, 0, len(pipeline))
for _, node := range pipeline {
if node.RefID() == condition.Condition {
return nil
return &conditionEvaluator{
pipeline: pipeline,
expressionService: e.expressionService,
condition: condition,
evalTimeout: e.evaluationTimeout,
}, nil
}
conditions = append(conditions, node.RefID())
}
return fmt.Errorf("condition %s does not exist, must be one of %v", condition.Condition, conditions)
return nil, fmt.Errorf("condition %s does not exist, must be one of %v", condition.Condition, conditions)
}

View File

@@ -0,0 +1,137 @@
// Code generated by mockery v2.14.0. DO NOT EDIT.
package eval_mocks
import (
context "context"
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
eval "github.com/grafana/grafana/pkg/services/ngalert/eval"
mock "github.com/stretchr/testify/mock"
time "time"
)
// ConditionEvaluatorMock is an autogenerated mock type for the ConditionEvaluator type
type ConditionEvaluatorMock struct {
mock.Mock
}
type ConditionEvaluatorMock_Expecter struct {
mock *mock.Mock
}
func (_m *ConditionEvaluatorMock) EXPECT() *ConditionEvaluatorMock_Expecter {
return &ConditionEvaluatorMock_Expecter{mock: &_m.Mock}
}
// Evaluate provides a mock function with given fields: ctx, now
func (_m *ConditionEvaluatorMock) Evaluate(ctx context.Context, now time.Time) (eval.Results, error) {
ret := _m.Called(ctx, now)
var r0 eval.Results
if rf, ok := ret.Get(0).(func(context.Context, time.Time) eval.Results); ok {
r0 = rf(ctx, now)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(eval.Results)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, time.Time) error); ok {
r1 = rf(ctx, now)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ConditionEvaluatorMock_Evaluate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Evaluate'
type ConditionEvaluatorMock_Evaluate_Call struct {
*mock.Call
}
// Evaluate is a helper method to define mock.On call
// - ctx context.Context
// - now time.Time
func (_e *ConditionEvaluatorMock_Expecter) Evaluate(ctx interface{}, now interface{}) *ConditionEvaluatorMock_Evaluate_Call {
return &ConditionEvaluatorMock_Evaluate_Call{Call: _e.mock.On("Evaluate", ctx, now)}
}
func (_c *ConditionEvaluatorMock_Evaluate_Call) Run(run func(ctx context.Context, now time.Time)) *ConditionEvaluatorMock_Evaluate_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(time.Time))
})
return _c
}
func (_c *ConditionEvaluatorMock_Evaluate_Call) Return(_a0 eval.Results, _a1 error) *ConditionEvaluatorMock_Evaluate_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// EvaluateRaw provides a mock function with given fields: ctx, now
func (_m *ConditionEvaluatorMock) EvaluateRaw(ctx context.Context, now time.Time) (*backend.QueryDataResponse, error) {
ret := _m.Called(ctx, now)
var r0 *backend.QueryDataResponse
if rf, ok := ret.Get(0).(func(context.Context, time.Time) *backend.QueryDataResponse); ok {
r0 = rf(ctx, now)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*backend.QueryDataResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(context.Context, time.Time) error); ok {
r1 = rf(ctx, now)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// ConditionEvaluatorMock_EvaluateRaw_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'EvaluateRaw'
type ConditionEvaluatorMock_EvaluateRaw_Call struct {
*mock.Call
}
// EvaluateRaw is a helper method to define mock.On call
// - ctx context.Context
// - now time.Time
func (_e *ConditionEvaluatorMock_Expecter) EvaluateRaw(ctx interface{}, now interface{}) *ConditionEvaluatorMock_EvaluateRaw_Call {
return &ConditionEvaluatorMock_EvaluateRaw_Call{Call: _e.mock.On("EvaluateRaw", ctx, now)}
}
func (_c *ConditionEvaluatorMock_EvaluateRaw_Call) Run(run func(ctx context.Context, now time.Time)) *ConditionEvaluatorMock_EvaluateRaw_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(context.Context), args[1].(time.Time))
})
return _c
}
func (_c *ConditionEvaluatorMock_EvaluateRaw_Call) Return(resp *backend.QueryDataResponse, err error) *ConditionEvaluatorMock_EvaluateRaw_Call {
_c.Call.Return(resp, err)
return _c
}
type mockConstructorTestingTNewConditionEvaluatorMock interface {
mock.TestingT
Cleanup(func())
}
// NewConditionEvaluatorMock creates a new instance of ConditionEvaluatorMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations.
func NewConditionEvaluatorMock(t mockConstructorTestingTNewConditionEvaluatorMock) *ConditionEvaluatorMock {
mock := &ConditionEvaluatorMock{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@@ -0,0 +1,32 @@
package eval_mocks
import (
"errors"
"github.com/grafana/grafana/pkg/services/ngalert/eval"
"github.com/grafana/grafana/pkg/services/ngalert/models"
)
type fakeEvaluatorFactory struct {
err error
evaluator eval.ConditionEvaluator
}
func NewEvaluatorFactory(evaluator eval.ConditionEvaluator) eval.EvaluatorFactory {
return &fakeEvaluatorFactory{evaluator: evaluator}
}
func NewFailingEvaluatorFactory(err error) eval.EvaluatorFactory {
if err == nil {
err = errors.New("test")
}
return &fakeEvaluatorFactory{err: err}
}
func (f fakeEvaluatorFactory) Validate(ctx eval.EvaluationContext, condition models.Condition) error {
return f.err
}
func (f fakeEvaluatorFactory) Create(ctx eval.EvaluationContext, condition models.Condition) (eval.ConditionEvaluator, error) {
return f.evaluator, f.err
}

View File

@@ -443,10 +443,10 @@ func TestValidate(t *testing.T) {
cacheService := &fakes.FakeCacheService{}
condition := testCase.condition(cacheService)
evaluator := NewEvaluator(&setting.Cfg{ExpressionsEnabled: true}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil))
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil))
evalCtx := Context(context.Background(), u)
err := evaluator.Validate(evalCtx, condition)
_, err := evaluator.Create(evalCtx, condition)
if testCase.error {
require.Error(t, err)
} else {

View File

@@ -1,160 +0,0 @@
// Code generated by mockery v2.12.0. DO NOT EDIT.
package eval
import (
backend "github.com/grafana/grafana-plugin-sdk-go/backend"
mock "github.com/stretchr/testify/mock"
models "github.com/grafana/grafana/pkg/services/ngalert/models"
testing "testing"
)
// FakeEvaluator is an autogenerated mock type for the Evaluator type
type FakeEvaluator struct {
mock.Mock
}
type FakeEvaluator_Expecter struct {
mock *mock.Mock
}
func (_m *FakeEvaluator) EXPECT() *FakeEvaluator_Expecter {
return &FakeEvaluator_Expecter{mock: &_m.Mock}
}
// ConditionEval provides a mock function with given fields: ctx, condition
func (_m *FakeEvaluator) ConditionEval(ctx EvaluationContext, condition models.Condition) Results {
ret := _m.Called(ctx, condition)
var r0 Results
if rf, ok := ret.Get(0).(func(EvaluationContext, models.Condition) Results); ok {
r0 = rf(ctx, condition)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(Results)
}
}
return r0
}
// FakeEvaluator_ConditionEval_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'ConditionEval'
type FakeEvaluator_ConditionEval_Call struct {
*mock.Call
}
// ConditionEval is a helper method to define mock.On call
// - ctx EvaluationContext
// - condition models.Condition
func (_e *FakeEvaluator_Expecter) ConditionEval(ctx interface{}, condition interface{}) *FakeEvaluator_ConditionEval_Call {
return &FakeEvaluator_ConditionEval_Call{Call: _e.mock.On("ConditionEval", ctx, condition)}
}
func (_c *FakeEvaluator_ConditionEval_Call) Run(run func(ctx EvaluationContext, condition models.Condition)) *FakeEvaluator_ConditionEval_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(EvaluationContext), args[1].(models.Condition))
})
return _c
}
func (_c *FakeEvaluator_ConditionEval_Call) Return(_a0 Results) *FakeEvaluator_ConditionEval_Call {
_c.Call.Return(_a0)
return _c
}
// QueriesAndExpressionsEval provides a mock function with given fields: ctx, data
func (_m *FakeEvaluator) QueriesAndExpressionsEval(ctx EvaluationContext, data []models.AlertQuery) (*backend.QueryDataResponse, error) {
ret := _m.Called(ctx, data)
var r0 *backend.QueryDataResponse
if rf, ok := ret.Get(0).(func(EvaluationContext, []models.AlertQuery) *backend.QueryDataResponse); ok {
r0 = rf(ctx, data)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(*backend.QueryDataResponse)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(EvaluationContext, []models.AlertQuery) error); ok {
r1 = rf(ctx, data)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// FakeEvaluator_QueriesAndExpressionsEval_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueriesAndExpressionsEval'
type FakeEvaluator_QueriesAndExpressionsEval_Call struct {
*mock.Call
}
// QueriesAndExpressionsEval is a helper method to define mock.On call
// - ctx EvaluationContext
// - data []models.AlertQuery
func (_e *FakeEvaluator_Expecter) QueriesAndExpressionsEval(ctx interface{}, data interface{}) *FakeEvaluator_QueriesAndExpressionsEval_Call {
return &FakeEvaluator_QueriesAndExpressionsEval_Call{Call: _e.mock.On("QueriesAndExpressionsEval", ctx, data)}
}
func (_c *FakeEvaluator_QueriesAndExpressionsEval_Call) Run(run func(ctx EvaluationContext, data []models.AlertQuery)) *FakeEvaluator_QueriesAndExpressionsEval_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(EvaluationContext), args[1].([]models.AlertQuery))
})
return _c
}
func (_c *FakeEvaluator_QueriesAndExpressionsEval_Call) Return(_a0 *backend.QueryDataResponse, _a1 error) *FakeEvaluator_QueriesAndExpressionsEval_Call {
_c.Call.Return(_a0, _a1)
return _c
}
// Validate provides a mock function with given fields: ctx, condition
func (_m *FakeEvaluator) Validate(ctx EvaluationContext, condition models.Condition) error {
ret := _m.Called(ctx, condition)
var r0 error
if rf, ok := ret.Get(0).(func(EvaluationContext, models.Condition) error); ok {
r0 = rf(ctx, condition)
} else {
r0 = ret.Error(0)
}
return r0
}
// FakeEvaluator_Validate_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Validate'
type FakeEvaluator_Validate_Call struct {
*mock.Call
}
// Validate is a helper method to define mock.On call
// - ctx EvaluationContext
// - condition models.Condition
func (_e *FakeEvaluator_Expecter) Validate(ctx interface{}, condition interface{}) *FakeEvaluator_Validate_Call {
return &FakeEvaluator_Validate_Call{Call: _e.mock.On("Validate", ctx, condition)}
}
func (_c *FakeEvaluator_Validate_Call) Run(run func(ctx EvaluationContext, condition models.Condition)) *FakeEvaluator_Validate_Call {
_c.Call.Run(func(args mock.Arguments) {
run(args[0].(EvaluationContext), args[1].(models.Condition))
})
return _c
}
func (_c *FakeEvaluator_Validate_Call) Return(_a0 error) *FakeEvaluator_Validate_Call {
_c.Call.Return(_a0)
return _c
}
// NewFakeEvaluator creates a new instance of FakeEvaluator. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewFakeEvaluator(t testing.TB) *FakeEvaluator {
mock := &FakeEvaluator{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}