From 2d20c8db7b01b3f1f3c946cf0cc6c78c13a178fa Mon Sep 17 00:00:00 2001 From: Yuriy Tseretyan Date: Wed, 26 Oct 2022 16:13:58 -0400 Subject: [PATCH] Chore: Expression engine to support relative time range (#57474) * make TimeRange interface and add relative range * make Execute methods support the current time * update resample to support relative time range * update DSNode to support relative time range * update query service to create queries with absolute time * make alerting evaluator create relative time ranges --- pkg/expr/classic/classic.go | 3 ++- pkg/expr/classic/classic_test.go | 3 ++- pkg/expr/commands.go | 14 ++++++---- pkg/expr/commands_test.go | 7 ++--- pkg/expr/graph.go | 7 ++--- pkg/expr/graph_test.go | 4 +++ pkg/expr/nodes.go | 16 ++++++------ pkg/expr/service.go | 5 ++-- pkg/expr/service_test.go | 6 ++++- pkg/expr/threshold.go | 5 ++-- pkg/expr/threshold_test.go | 1 - pkg/expr/transform.go | 30 +++++++++++++++++++--- pkg/services/ngalert/eval/eval.go | 7 ++--- pkg/services/ngalert/models/alert_query.go | 9 +++---- pkg/services/query/query.go | 4 +-- 15 files changed, 79 insertions(+), 42 deletions(-) diff --git a/pkg/expr/classic/classic.go b/pkg/expr/classic/classic.go index 4bcc06a21a9..4b06f0ce65b 100644 --- a/pkg/expr/classic/classic.go +++ b/pkg/expr/classic/classic.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strconv" + "time" "github.com/grafana/grafana-plugin-sdk-go/data" @@ -67,7 +68,7 @@ func (cmd *ConditionsCmd) NeedsVars() []string { // Execute runs the command and returns the results or an error if the command // failed to execute. -func (cmd *ConditionsCmd) Execute(_ context.Context, vars mathexp.Vars) (mathexp.Results, error) { +func (cmd *ConditionsCmd) Execute(_ context.Context, _ time.Time, vars mathexp.Vars) (mathexp.Results, error) { firing := true newRes := mathexp.Results{} noDataFound := true diff --git a/pkg/expr/classic/classic_test.go b/pkg/expr/classic/classic_test.go index ad679d1cb62..8b8d1e56855 100644 --- a/pkg/expr/classic/classic_test.go +++ b/pkg/expr/classic/classic_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "testing" + "time" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/stretchr/testify/require" @@ -405,7 +406,7 @@ func TestConditionsCmdExecute(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - res, err := tt.conditionsCmd.Execute(context.Background(), tt.vars) + res, err := tt.conditionsCmd.Execute(context.Background(), time.Now(), tt.vars) require.NoError(t, err) require.Equal(t, 1, len(res.Values)) diff --git a/pkg/expr/commands.go b/pkg/expr/commands.go index 2f32ed37760..16a29fc9512 100644 --- a/pkg/expr/commands.go +++ b/pkg/expr/commands.go @@ -16,7 +16,7 @@ import ( // Command is an interface for all expression commands. type Command interface { NeedsVars() []string - Execute(c context.Context, vars mathexp.Vars) (mathexp.Results, error) + Execute(ctx context.Context, now time.Time, vars mathexp.Vars) (mathexp.Results, error) } // MathCommand is a command for a math expression such as "1 + $GA / 2" @@ -66,7 +66,7 @@ func (gm *MathCommand) NeedsVars() []string { // Execute runs the command and returns the results or an error if the command // failed to execute. -func (gm *MathCommand) Execute(ctx context.Context, vars mathexp.Vars) (mathexp.Results, error) { +func (gm *MathCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars) (mathexp.Results, error) { return gm.Expression.Execute(gm.refID, vars) } @@ -154,7 +154,7 @@ func (gr *ReduceCommand) NeedsVars() []string { // Execute runs the command and returns the results or an error if the command // failed to execute. -func (gr *ReduceCommand) Execute(_ context.Context, vars mathexp.Vars) (mathexp.Results, error) { +func (gr *ReduceCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars) (mathexp.Results, error) { newRes := mathexp.Results{} for _, val := range vars[gr.VarToReduce].Values { switch v := val.(type) { @@ -210,6 +210,9 @@ func NewResampleCommand(refID, rawWindow, varToResample string, downsampler stri // UnmarshalResampleCommand creates a ResampleCMD from Grafana's frontend query. func UnmarshalResampleCommand(rn *rawNode) (*ResampleCommand, error) { + if rn.TimeRange == nil { + return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID) + } rawVar, ok := rn.Query["expression"] if !ok { return nil, errors.New("no expression ID to resample. must be a reference to an existing query or expression") @@ -259,14 +262,15 @@ func (gr *ResampleCommand) NeedsVars() []string { // Execute runs the command and returns the results or an error if the command // failed to execute. -func (gr *ResampleCommand) Execute(ctx context.Context, vars mathexp.Vars) (mathexp.Results, error) { +func (gr *ResampleCommand) Execute(_ context.Context, now time.Time, vars mathexp.Vars) (mathexp.Results, error) { newRes := mathexp.Results{} + timeRange := gr.TimeRange.AbsoluteTime(now) for _, val := range vars[gr.VarToResample].Values { series, ok := val.(mathexp.Series) if !ok { return newRes, fmt.Errorf("can only resample type series, got type %v", val.Type()) } - num, err := series.Resample(gr.refID, gr.Window, gr.Downsampler, gr.Upsampler, gr.TimeRange.From, gr.TimeRange.To) + num, err := series.Resample(gr.refID, gr.Window, gr.Downsampler, gr.Upsampler, timeRange.From, timeRange.To) if err != nil { return newRes, err } diff --git a/pkg/expr/commands_test.go b/pkg/expr/commands_test.go index 06408f5bd36..7faa68c6eeb 100644 --- a/pkg/expr/commands_test.go +++ b/pkg/expr/commands_test.go @@ -6,6 +6,7 @@ import ( "fmt" "math/rand" "testing" + "time" "github.com/grafana/grafana-plugin-sdk-go/data" "github.com/stretchr/testify/assert" @@ -80,7 +81,7 @@ func Test_UnmarshalReduceCommand_Settings(t *testing.T) { RefID: "A", Query: qmap, QueryType: "", - TimeRange: TimeRange{}, + TimeRange: RelativeTimeRange{}, DataSource: nil, }) @@ -114,7 +115,7 @@ func TestReduceExecute(t *testing.T) { }, } - execute, err := cmd.Execute(context.Background(), vars) + execute, err := cmd.Execute(context.Background(), time.Now(), vars) require.NoError(t, err) require.Len(t, execute.Values, len(numbers)) @@ -149,7 +150,7 @@ func TestReduceExecute(t *testing.T) { }, } - results, err := cmd.Execute(context.Background(), vars) + results, err := cmd.Execute(context.Background(), time.Now(), vars) require.NoError(t, err) require.Len(t, results.Values, 1) diff --git a/pkg/expr/graph.go b/pkg/expr/graph.go index 9f5a59d7415..d473af64566 100644 --- a/pkg/expr/graph.go +++ b/pkg/expr/graph.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/grafana/grafana/pkg/expr/mathexp" @@ -37,7 +38,7 @@ type Node interface { ID() int64 // ID() allows the gonum graph node interface to be fulfilled NodeType() NodeType RefID() string - Execute(c context.Context, vars mathexp.Vars, s *Service) (mathexp.Results, error) + Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) String() string } @@ -46,10 +47,10 @@ type DataPipeline []Node // execute runs all the command/datasource requests in the pipeline return a // map of the refId of the of each command -func (dp *DataPipeline) execute(c context.Context, s *Service) (mathexp.Vars, error) { +func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) { vars := make(mathexp.Vars) for _, node := range *dp { - res, err := node.Execute(c, vars, s) + res, err := node.Execute(c, now, vars, s) if err != nil { return nil, err } diff --git a/pkg/expr/graph_test.go b/pkg/expr/graph_test.go index 325118bafaf..185c4467a87 100644 --- a/pkg/expr/graph_test.go +++ b/pkg/expr/graph_test.go @@ -34,6 +34,7 @@ func TestServicebuildPipeLine(t *testing.T) { DataSource: &datasources.DataSource{ Uid: "Fake", }, + TimeRange: AbsoluteTimeRange{}, }, }, }, @@ -144,6 +145,7 @@ func TestServicebuildPipeLine(t *testing.T) { DataSource: &datasources.DataSource{ Uid: "Fake", }, + TimeRange: AbsoluteTimeRange{}, }, }, }, @@ -198,6 +200,7 @@ func TestServicebuildPipeLine(t *testing.T) { DataSource: &datasources.DataSource{ Uid: "Fake", }, + TimeRange: AbsoluteTimeRange{}, }, }, }, @@ -221,6 +224,7 @@ func TestServicebuildPipeLine(t *testing.T) { DataSource: &datasources.DataSource{ Uid: "Fake", }, + TimeRange: AbsoluteTimeRange{}, }, }, }, diff --git a/pkg/expr/nodes.go b/pkg/expr/nodes.go index 2bf89a0ff11..b63e6882280 100644 --- a/pkg/expr/nodes.go +++ b/pkg/expr/nodes.go @@ -92,8 +92,8 @@ func (gn *CMDNode) NodeType() NodeType { // Execute runs the node and adds the results to vars. If the node requires // other nodes they must have already been executed and their results must // already by in vars. -func (gn *CMDNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (mathexp.Results, error) { - return gn.Command.Execute(ctx, vars) +func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, _ *Service) (mathexp.Results, error) { + return gn.Command.Execute(ctx, now, vars) } func buildCMDNode(dp *simple.DirectedGraph, rn *rawNode) (*CMDNode, error) { @@ -156,6 +156,9 @@ func (dn *DSNode) NodeType() NodeType { } func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (*DSNode, error) { + if rn.TimeRange == nil { + return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID) + } encodedQuery, err := json.Marshal(rn.Query) if err != nil { return nil, err @@ -198,7 +201,7 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques // Execute runs the node and adds the results to vars. If the node requires // other nodes they must have already been executed and their results must // already by in vars. -func (dn *DSNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (mathexp.Results, error) { +func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (mathexp.Results, error) { dsInstanceSettings, err := adapters.ModelToInstanceSettings(dn.datasource, s.decryptSecureJsonDataFn(ctx)) if err != nil { return mathexp.Results{}, fmt.Errorf("%v: %w", "failed to convert datasource instance settings", err) @@ -215,11 +218,8 @@ func (dn *DSNode) Execute(ctx context.Context, vars mathexp.Vars, s *Service) (m MaxDataPoints: dn.maxDP, Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS), JSON: dn.query, - TimeRange: backend.TimeRange{ - From: dn.timeRange.From, - To: dn.timeRange.To, - }, - QueryType: dn.queryType, + TimeRange: dn.timeRange.AbsoluteTime(now), + QueryType: dn.queryType, }, } diff --git a/pkg/expr/service.go b/pkg/expr/service.go index b8c8f82311f..b4fe61a06f7 100644 --- a/pkg/expr/service.go +++ b/pkg/expr/service.go @@ -2,6 +2,7 @@ package expr import ( "context" + "time" "github.com/grafana/grafana-plugin-sdk-go/backend" @@ -62,9 +63,9 @@ func (s *Service) BuildPipeline(req *Request) (DataPipeline, error) { } // ExecutePipeline executes an expression pipeline and returns all the results. -func (s *Service) ExecutePipeline(ctx context.Context, pipeline DataPipeline) (*backend.QueryDataResponse, error) { +func (s *Service) ExecutePipeline(ctx context.Context, now time.Time, pipeline DataPipeline) (*backend.QueryDataResponse, error) { res := backend.NewQueryDataResponse() - vars, err := pipeline.execute(ctx, s) + vars, err := pipeline.execute(ctx, now, s) if err != nil { return nil, err } diff --git a/pkg/expr/service_test.go b/pkg/expr/service_test.go index accad46240f..a6973158c3d 100644 --- a/pkg/expr/service_test.go +++ b/pkg/expr/service_test.go @@ -43,6 +43,10 @@ func TestService(t *testing.T) { Type: "test", }, JSON: json.RawMessage(`{ "datasource": { "uid": "1" }, "intervalMs": 1000, "maxDataPoints": 1000 }`), + TimeRange: AbsoluteTimeRange{ + From: time.Time{}, + To: time.Time{}, + }, }, { RefID: "B", @@ -56,7 +60,7 @@ func TestService(t *testing.T) { pl, err := s.BuildPipeline(req) require.NoError(t, err) - res, err := s.ExecutePipeline(context.Background(), pl) + res, err := s.ExecutePipeline(context.Background(), time.Now(), pl) require.NoError(t, err) bDF := data.NewFrame("", diff --git a/pkg/expr/threshold.go b/pkg/expr/threshold.go index b3ffa819153..57f650fdca0 100644 --- a/pkg/expr/threshold.go +++ b/pkg/expr/threshold.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/grafana/grafana/pkg/expr/mathexp" ) @@ -88,7 +89,7 @@ func (tc *ThresholdCommand) NeedsVars() []string { return []string{tc.ReferenceVar} } -func (tc *ThresholdCommand) Execute(ctx context.Context, vars mathexp.Vars) (mathexp.Results, error) { +func (tc *ThresholdCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars) (mathexp.Results, error) { mathExpression, err := createMathExpression(tc.ReferenceVar, tc.ThresholdFunc, tc.Conditions) if err != nil { return mathexp.Results{}, err @@ -99,7 +100,7 @@ func (tc *ThresholdCommand) Execute(ctx context.Context, vars mathexp.Vars) (mat return mathexp.Results{}, err } - return mathCommand.Execute(ctx, vars) + return mathCommand.Execute(ctx, now, vars) } // createMathExpression converts all the info we have about a "threshold" expression in to a Math expression diff --git a/pkg/expr/threshold_test.go b/pkg/expr/threshold_test.go index 0b3717760dd..9bb87144b2e 100644 --- a/pkg/expr/threshold_test.go +++ b/pkg/expr/threshold_test.go @@ -93,7 +93,6 @@ func TestUnmarshalThresholdCommand(t *testing.T) { RefID: "", Query: qmap, QueryType: "", - TimeRange: TimeRange{}, DataSource: nil, }) diff --git a/pkg/expr/transform.go b/pkg/expr/transform.go index b3ae0de2532..a128ab9ec88 100644 --- a/pkg/expr/transform.go +++ b/pkg/expr/transform.go @@ -50,14 +50,38 @@ type Query struct { } // TimeRange is a time.Time based TimeRange. -type TimeRange struct { +type TimeRange interface { + AbsoluteTime(now time.Time) backend.TimeRange +} + +type AbsoluteTimeRange struct { From time.Time To time.Time } +func (r AbsoluteTimeRange) AbsoluteTime(_ time.Time) backend.TimeRange { + return backend.TimeRange{ + From: r.From, + To: r.To, + } +} + +// RelativeTimeRange is a time range relative to some absolute time. +type RelativeTimeRange struct { + From time.Duration + To time.Duration +} + +func (r RelativeTimeRange) AbsoluteTime(t time.Time) backend.TimeRange { + return backend.TimeRange{ + From: t.Add(r.From), + To: t.Add(r.To), + } +} + // TransformData takes Queries which are either expressions nodes // or are datasource requests. -func (s *Service) TransformData(ctx context.Context, req *Request) (r *backend.QueryDataResponse, err error) { +func (s *Service) TransformData(ctx context.Context, now time.Time, req *Request) (r *backend.QueryDataResponse, err error) { if s.isDisabled() { return nil, fmt.Errorf("server side expressions are disabled") } @@ -83,7 +107,7 @@ func (s *Service) TransformData(ctx context.Context, req *Request) (r *backend.Q } // Execute the pipeline - responses, err := s.ExecutePipeline(ctx, pipeline) + responses, err := s.ExecutePipeline(ctx, now, pipeline) if err != nil { return nil, err } diff --git a/pkg/services/ngalert/eval/eval.go b/pkg/services/ngalert/eval/eval.go index 755ae5be33b..24e83b6e6b5 100644 --- a/pkg/services/ngalert/eval/eval.go +++ b/pkg/services/ngalert/eval/eval.go @@ -215,10 +215,7 @@ func getExprRequest(ctx EvaluationContext, data []models.AlertQuery, dsCacheServ } req.Queries = append(req.Queries, expr.Query{ - TimeRange: expr.TimeRange{ - From: q.RelativeTimeRange.ToTimeRange(ctx.At).From, - To: q.RelativeTimeRange.ToTimeRange(ctx.At).To, - }, + TimeRange: q.RelativeTimeRange.ToTimeRange(), DataSource: ds, JSON: model, Interval: interval, @@ -334,7 +331,7 @@ func executeQueriesAndExpressions(ctx EvaluationContext, data []models.AlertQuer return nil, err } - return exprService.TransformData(ctx.Ctx, queryDataReq) + return exprService.TransformData(ctx.Ctx, ctx.At, queryDataReq) } // datasourceUIDsToRefIDs returns a sorted slice of Ref IDs for each Datasource UID. diff --git a/pkg/services/ngalert/models/alert_query.go b/pkg/services/ngalert/models/alert_query.go index 503f349362d..4f2d84f857c 100644 --- a/pkg/services/ngalert/models/alert_query.go +++ b/pkg/services/ngalert/models/alert_query.go @@ -6,7 +6,6 @@ import ( "fmt" "time" - "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana/pkg/expr" ) @@ -70,10 +69,10 @@ func (rtr *RelativeTimeRange) isValid() bool { return rtr.From > rtr.To } -func (rtr *RelativeTimeRange) ToTimeRange(now time.Time) backend.TimeRange { - return backend.TimeRange{ - From: now.Add(-time.Duration(rtr.From)), - To: now.Add(-time.Duration(rtr.To)), +func (rtr *RelativeTimeRange) ToTimeRange() expr.TimeRange { + return expr.RelativeTimeRange{ + From: -time.Duration(rtr.From), + To: -time.Duration(rtr.To), } } diff --git a/pkg/services/query/query.go b/pkg/services/query/query.go index 2cb4405274b..390e6d2c9f4 100644 --- a/pkg/services/query/query.go +++ b/pkg/services/query/query.go @@ -144,14 +144,14 @@ func (s *Service) handleExpressions(ctx context.Context, user *user.SignedInUser MaxDataPoints: pq.query.MaxDataPoints, QueryType: pq.query.QueryType, DataSource: pq.datasource, - TimeRange: expr.TimeRange{ + TimeRange: expr.AbsoluteTimeRange{ From: pq.query.TimeRange.From, To: pq.query.TimeRange.To, }, }) } - qdr, err := s.expressionService.TransformData(ctx, &exprReq) + qdr, err := s.expressionService.TransformData(ctx, time.Now(), &exprReq) // use time now because all queries have absolute time range if err != nil { return nil, fmt.Errorf("expression request error: %w", err) }