mirror of
https://github.com/grafana/grafana.git
synced 2024-11-23 01:16:31 -06:00
parent
f3fcc31277
commit
840fb32ad8
@ -10,6 +10,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
// ConditionsCmd is a command that supports the reduction and comparison of conditions.
|
||||
@ -68,7 +69,9 @@ func (cmd *ConditionsCmd) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (cmd *ConditionsCmd) Execute(ctx context.Context, t time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
func (cmd *ConditionsCmd) Execute(ctx context.Context, t time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
ctx, span := tracer.Start(ctx, "SSE.ExecuteClassicConditions")
|
||||
defer span.End()
|
||||
// isFiring and isNoData contains the outcome of ConditionsCmd, and is derived from the
|
||||
// boolean comparison of isCondFiring and isCondNoData of all conditions in ConditionsCmd
|
||||
var isFiring, isNoData bool
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
@ -598,7 +599,7 @@ func TestConditionsCmd(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
res, err := tt.cmd.Execute(context.Background(), time.Now(), tt.vars)
|
||||
res, err := tt.cmd.Execute(context.Background(), time.Now(), tt.vars, tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.expected(), res)
|
||||
})
|
||||
|
@ -9,14 +9,16 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/gtime"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
// Command is an interface for all expression commands.
|
||||
type Command interface {
|
||||
NeedsVars() []string
|
||||
Execute(ctx context.Context, now time.Time, vars mathexp.Vars) (mathexp.Results, error)
|
||||
Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error)
|
||||
}
|
||||
|
||||
// MathCommand is a command for a math expression such as "1 + $GA / 2"
|
||||
@ -66,8 +68,11 @@ func (gm *MathCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gm *MathCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
return gm.Expression.Execute(gm.refID, vars)
|
||||
func (gm *MathCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
_, span := tracer.Start(ctx, "SSE.ExecuteMath")
|
||||
span.SetAttributes("expression", gm.RawExpression, attribute.Key("expression").String(gm.RawExpression))
|
||||
defer span.End()
|
||||
return gm.Expression.Execute(gm.refID, vars, tracer)
|
||||
}
|
||||
|
||||
// ReduceCommand is an expression command for reduction of a timeseries such as a min, mean, or max.
|
||||
@ -154,7 +159,12 @@ func (gr *ReduceCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gr *ReduceCommand) Execute(_ context.Context, _ time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
func (gr *ReduceCommand) Execute(ctx context.Context, _ time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
_, span := tracer.Start(ctx, "SSE.ExecuteReduce")
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes("reducer", gr.Reducer, attribute.Key("reducer").String(gr.Reducer))
|
||||
|
||||
newRes := mathexp.Results{}
|
||||
for _, val := range vars[gr.VarToReduce].Values {
|
||||
switch v := val.(type) {
|
||||
@ -262,7 +272,9 @@ func (gr *ResampleCommand) NeedsVars() []string {
|
||||
|
||||
// Execute runs the command and returns the results or an error if the command
|
||||
// failed to execute.
|
||||
func (gr *ResampleCommand) Execute(_ context.Context, now time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
func (gr *ResampleCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
_, span := tracer.Start(ctx, "SSE.ExecuteResample")
|
||||
defer span.End()
|
||||
newRes := mathexp.Results{}
|
||||
timeRange := gr.TimeRange.AbsoluteTime(now)
|
||||
for _, val := range vars[gr.VarToResample].Values {
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp/parse"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/util"
|
||||
)
|
||||
|
||||
@ -115,7 +116,7 @@ func TestReduceExecute(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
execute, err := cmd.Execute(context.Background(), time.Now(), vars)
|
||||
execute, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, execute.Values, len(numbers))
|
||||
@ -150,7 +151,7 @@ func TestReduceExecute(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
results, err := cmd.Execute(context.Background(), time.Now(), vars)
|
||||
results, err := cmd.Execute(context.Background(), time.Now(), vars, tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, results.Values, 1)
|
||||
@ -209,7 +210,7 @@ func TestResampleCommand_Execute(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{
|
||||
varToReduce: mathexp.Results{Values: mathexp.Values{test.vals}},
|
||||
})
|
||||
}, tracing.NewFakeTracer())
|
||||
if test.isError {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
@ -224,7 +225,7 @@ func TestResampleCommand_Execute(t *testing.T) {
|
||||
t.Run("should return empty result if input is nil Value", func(t *testing.T) {
|
||||
result, err := cmd.Execute(context.Background(), time.Now(), mathexp.Vars{
|
||||
varToReduce: mathexp.Results{Values: mathexp.Values{nil}},
|
||||
})
|
||||
}, tracing.NewFakeTracer())
|
||||
require.Empty(t, result.Values)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
@ -1,6 +1,7 @@
|
||||
package expr
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
@ -11,6 +12,8 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
)
|
||||
|
||||
func shouldUseDataplane(frames data.Frames, logger *log.ConcreteLogger, disable bool) (dt data.FrameType, b bool, e error) {
|
||||
@ -55,8 +58,12 @@ func shouldUseDataplane(frames data.Frames, logger *log.ConcreteLogger, disable
|
||||
return dt, true, nil
|
||||
}
|
||||
|
||||
func handleDataplaneFrames(k data.FrameTypeKind, frames data.Frames) (mathexp.Results, error) {
|
||||
switch k {
|
||||
func handleDataplaneFrames(ctx context.Context, tracer tracing.Tracer, t data.FrameType, frames data.Frames) (mathexp.Results, error) {
|
||||
_, span := tracer.Start(ctx, "SSE.HandleDataPlaneData")
|
||||
defer span.End()
|
||||
span.SetAttributes("dataplane.type", t, attribute.Key("dataplane.type").String(string(t)))
|
||||
|
||||
switch t.Kind() {
|
||||
case data.KindUnknown:
|
||||
return mathexp.Results{Values: mathexp.Values{mathexp.NoData{}.New()}}, nil
|
||||
case data.KindTimeSeries:
|
||||
@ -64,7 +71,7 @@ func handleDataplaneFrames(k data.FrameTypeKind, frames data.Frames) (mathexp.Re
|
||||
case data.KindNumeric:
|
||||
return handleDataplaneNumeric(frames)
|
||||
default:
|
||||
return mathexp.Results{}, fmt.Errorf("kind %s not supported by server side expressions", k)
|
||||
return mathexp.Results{}, fmt.Errorf("kind %s (type %s) not supported by server side expressions", t.Kind(), t)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"github.com/grafana/dataplane/examples"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
datafakes "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
@ -49,6 +50,7 @@ func framesPassThroughService(t *testing.T, frames data.Frames) (data.Frames, er
|
||||
dataService: me,
|
||||
dataSourceService: &datafakes.FakeDataSourceService{},
|
||||
features: &featuremgmt.FeatureManager{},
|
||||
tracer: tracing.InitializeTracerForTest(),
|
||||
metrics: newMetrics(nil),
|
||||
}
|
||||
queries := []Query{{
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"gonum.org/v1/gonum/graph/simple"
|
||||
"gonum.org/v1/gonum/graph/topo"
|
||||
|
||||
@ -50,6 +51,15 @@ type DataPipeline []Node
|
||||
func (dp *DataPipeline) execute(c context.Context, now time.Time, s *Service) (mathexp.Vars, error) {
|
||||
vars := make(mathexp.Vars)
|
||||
for _, node := range *dp {
|
||||
c, span := s.tracer.Start(c, "SSE.ExecuteNode")
|
||||
span.SetAttributes("node.refId", node.RefID(), attribute.Key("node.refId").String(node.RefID()))
|
||||
if node.NodeType() == TypeCMDNode {
|
||||
cmdNode := node.(*CMDNode)
|
||||
inputRefIDs := cmdNode.Command.NeedsVars()
|
||||
span.SetAttributes("node.inputRefIDs", inputRefIDs, attribute.Key("node.inputRefIDs").StringSlice(inputRefIDs))
|
||||
}
|
||||
defer span.End()
|
||||
|
||||
res, err := node.Execute(c, now, vars, s)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -9,6 +9,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp/parse"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
// Expr holds a parsed math command expression.
|
||||
@ -25,6 +26,8 @@ type State struct {
|
||||
// - Unions (How many result A and many Result B in case A + B are joined)
|
||||
// - NaN/Null behavior
|
||||
RefID string
|
||||
|
||||
tracer tracing.Tracer
|
||||
}
|
||||
|
||||
// Vars holds the results of datasource queries or other expression commands.
|
||||
@ -44,11 +47,13 @@ func New(expr string, funcs ...map[string]parse.Func) (*Expr, error) {
|
||||
}
|
||||
|
||||
// Execute applies a parse expression to the context and executes it
|
||||
func (e *Expr) Execute(refID string, vars Vars) (r Results, err error) {
|
||||
func (e *Expr) Execute(refID string, vars Vars, tracer tracing.Tracer) (r Results, err error) {
|
||||
s := &State{
|
||||
Expr: e,
|
||||
Vars: vars,
|
||||
RefID: refID,
|
||||
|
||||
tracer: tracer,
|
||||
}
|
||||
return e.executeState(s)
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -147,7 +148,7 @@ func TestNaN(t *testing.T) {
|
||||
e, err := New(tt.expr)
|
||||
tt.newErrIs(t, err)
|
||||
if e != nil {
|
||||
res, err := e.Execute("", tt.vars)
|
||||
res, err := e.Execute("", tt.vars, tracing.NewFakeTracer())
|
||||
tt.execErrIs(t, err)
|
||||
if diff := cmp.Diff(res, tt.results, options...); diff != "" {
|
||||
assert.FailNow(t, tt.name, diff)
|
||||
@ -409,7 +410,7 @@ func TestNullValues(t *testing.T) {
|
||||
e, err := New(tt.expr)
|
||||
tt.newErrIs(t, err)
|
||||
if e != nil {
|
||||
res, err := e.Execute("", tt.vars)
|
||||
res, err := e.Execute("", tt.vars, tracing.NewFakeTracer())
|
||||
tt.execErrIs(t, err)
|
||||
if diff := cmp.Diff(tt.results, res, options...); diff != "" {
|
||||
t.Errorf("Result mismatch (-want +got):\n%s", diff)
|
||||
@ -446,7 +447,7 @@ func TestNoData(t *testing.T) {
|
||||
e, err := New(expr)
|
||||
require.NoError(t, err)
|
||||
if e != nil {
|
||||
res, err := e.Execute("", vars)
|
||||
res, err := e.Execute("", vars, tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Values, 1)
|
||||
require.Equal(t, NewNoData(), res.Values[0])
|
||||
@ -487,21 +488,21 @@ func TestNoData(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
if e != nil {
|
||||
t.Run("$A,$B=nodata", func(t *testing.T) {
|
||||
res, err := e.Execute("", makeVars(NewNoData(), NewNoData()))
|
||||
res, err := e.Execute("", makeVars(NewNoData(), NewNoData()), tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Values, 1)
|
||||
require.Equal(t, NewNoData(), res.Values[0])
|
||||
})
|
||||
|
||||
t.Run("$A=nodata, $B=series", func(t *testing.T) {
|
||||
res, err := e.Execute("", makeVars(NewNoData(), series))
|
||||
res, err := e.Execute("", makeVars(NewNoData(), series), tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Values, 1)
|
||||
require.Equal(t, NewNoData(), res.Values[0])
|
||||
})
|
||||
|
||||
t.Run("$A=series, $B=nodata", func(t *testing.T) {
|
||||
res, err := e.Execute("", makeVars(NewNoData(), series))
|
||||
res, err := e.Execute("", makeVars(NewNoData(), series), tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
require.Len(t, res.Values, 1)
|
||||
require.Equal(t, NewNoData(), res.Values[0])
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@ -78,7 +79,7 @@ func TestScalarExpr(t *testing.T) {
|
||||
e, err := New(tt.expr)
|
||||
tt.newErrIs(t, err)
|
||||
if e != nil {
|
||||
res, err := e.Execute("", tt.vars)
|
||||
res, err := e.Execute("", tt.vars, tracing.NewFakeTracer())
|
||||
tt.execErrIs(t, err)
|
||||
tt.resultIs(t, tt.Results, res)
|
||||
}
|
||||
@ -131,7 +132,7 @@ func TestNumberExpr(t *testing.T) {
|
||||
e, err := New(tt.expr)
|
||||
tt.newErrIs(t, err)
|
||||
if e != nil {
|
||||
res, err := e.Execute("", tt.vars)
|
||||
res, err := e.Execute("", tt.vars, tracing.NewFakeTracer())
|
||||
tt.execErrIs(t, err)
|
||||
tt.resultIs(t, tt.results, res)
|
||||
}
|
||||
|
@ -6,6 +6,7 @@ import (
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
@ -178,7 +179,7 @@ func TestSeriesExpr(t *testing.T) {
|
||||
e, err := New(tt.expr)
|
||||
tt.newErrIs(t, err)
|
||||
if e != nil {
|
||||
res, err := e.Execute("", tt.vars)
|
||||
res, err := e.Execute("", tt.vars, tracing.NewFakeTracer())
|
||||
tt.execErrIs(t, err)
|
||||
if diff := cmp.Diff(tt.results, res, data.FrameTestCompareOptions()...); diff != "" {
|
||||
t.Errorf("Result mismatch (-want +got):\n%s", diff)
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@ -81,7 +82,7 @@ func TestAbsFunc(t *testing.T) {
|
||||
e, err := New(tt.expr)
|
||||
tt.newErrIs(t, err)
|
||||
if e != nil {
|
||||
res, err := e.Execute("", tt.vars)
|
||||
res, err := e.Execute("", tt.vars, tracing.NewFakeTracer())
|
||||
tt.execErrIs(t, err)
|
||||
tt.resultIs(t, tt.results, res)
|
||||
}
|
||||
@ -152,7 +153,7 @@ func TestIsNumberFunc(t *testing.T) {
|
||||
e, err := New(tt.expr)
|
||||
require.NoError(t, err)
|
||||
if e != nil {
|
||||
res, err := e.Execute("", tt.vars)
|
||||
res, err := e.Execute("", tt.vars, tracing.NewFakeTracer())
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.results, res)
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"gonum.org/v1/gonum/graph/simple"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/classic"
|
||||
@ -92,8 +93,8 @@ func (gn *CMDNode) NodeType() NodeType {
|
||||
// Execute runs the node and adds the results to vars. If the node requires
|
||||
// other nodes they must have already been executed and their results must
|
||||
// already by in vars.
|
||||
func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, _ *Service) (mathexp.Results, error) {
|
||||
return gn.Command.Execute(ctx, now, vars)
|
||||
func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) {
|
||||
return gn.Command.Execute(ctx, now, vars, s.tracer)
|
||||
}
|
||||
|
||||
func buildCMDNode(dp *simple.DirectedGraph, rn *rawNode) (*CMDNode, error) {
|
||||
@ -203,6 +204,9 @@ func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Reques
|
||||
// already by in vars.
|
||||
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
|
||||
logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version)
|
||||
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
|
||||
defer span.End()
|
||||
|
||||
dsInstanceSettings, err := adapters.ModelToInstanceSettings(dn.datasource, s.decryptSecureJsonDataFn(ctx))
|
||||
if err != nil {
|
||||
return mathexp.Results{}, fmt.Errorf("%v: %w", "failed to convert datasource instance settings", err)
|
||||
@ -213,6 +217,7 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
||||
PluginID: dn.datasource.Type,
|
||||
User: dn.request.User,
|
||||
}
|
||||
span.SetAttributes("datasource.type", dn.datasource.Type, attribute.Key("datasource.type").String(dn.datasource.Type))
|
||||
|
||||
req := &backend.QueryDataRequest{
|
||||
PluginContext: pc,
|
||||
@ -268,7 +273,7 @@ func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s
|
||||
dt, useDataplane, _ = shouldUseDataplane(response.Frames, logger, s.features.IsEnabled(featuremgmt.FlagDisableSSEDataplane))
|
||||
if useDataplane {
|
||||
logger.Debug("Handling SSE data source query through dataplane", "datatype", dt)
|
||||
return handleDataplaneFrames(dt.Kind(), response.Frames)
|
||||
return handleDataplaneFrames(ctx, s.tracer, dt, response.Frames)
|
||||
}
|
||||
|
||||
dataSource := dn.datasource.Type
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
@ -44,15 +45,17 @@ type Service struct {
|
||||
dataSourceService datasources.DataSourceService
|
||||
features featuremgmt.FeatureToggles
|
||||
|
||||
tracer tracing.Tracer
|
||||
metrics *metrics
|
||||
}
|
||||
|
||||
func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, dataSourceService datasources.DataSourceService, features featuremgmt.FeatureToggles, registerer prometheus.Registerer) *Service {
|
||||
func ProvideService(cfg *setting.Cfg, pluginClient plugins.Client, dataSourceService datasources.DataSourceService, features featuremgmt.FeatureToggles, registerer prometheus.Registerer, tracer tracing.Tracer) *Service {
|
||||
return &Service{
|
||||
cfg: cfg,
|
||||
dataService: pluginClient,
|
||||
dataSourceService: dataSourceService,
|
||||
features: features,
|
||||
tracer: tracer,
|
||||
metrics: newMetrics(registerer),
|
||||
}
|
||||
}
|
||||
@ -71,6 +74,8 @@ func (s *Service) BuildPipeline(req *Request) (DataPipeline, error) {
|
||||
|
||||
// ExecutePipeline executes an expression pipeline and returns all the results.
|
||||
func (s *Service) ExecutePipeline(ctx context.Context, now time.Time, pipeline DataPipeline) (*backend.QueryDataResponse, error) {
|
||||
ctx, span := s.tracer.Start(ctx, "SSE.ExecutePipeline")
|
||||
defer span.End()
|
||||
res := backend.NewQueryDataResponse()
|
||||
vars, err := pipeline.execute(ctx, now, s)
|
||||
if err != nil {
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
datafakes "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
@ -34,6 +35,7 @@ func TestService(t *testing.T) {
|
||||
dataService: me,
|
||||
dataSourceService: &datafakes.FakeDataSourceService{},
|
||||
features: &featuremgmt.FeatureManager{},
|
||||
tracer: tracing.InitializeTracerForTest(),
|
||||
metrics: newMetrics(nil),
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr/mathexp"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
)
|
||||
|
||||
type ThresholdCommand struct {
|
||||
@ -89,7 +90,7 @@ func (tc *ThresholdCommand) NeedsVars() []string {
|
||||
return []string{tc.ReferenceVar}
|
||||
}
|
||||
|
||||
func (tc *ThresholdCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars) (mathexp.Results, error) {
|
||||
func (tc *ThresholdCommand) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, tracer tracing.Tracer) (mathexp.Results, error) {
|
||||
mathExpression, err := createMathExpression(tc.ReferenceVar, tc.ThresholdFunc, tc.Conditions)
|
||||
if err != nil {
|
||||
return mathexp.Results{}, err
|
||||
@ -100,7 +101,7 @@ func (tc *ThresholdCommand) Execute(ctx context.Context, now time.Time, vars mat
|
||||
return mathexp.Results{}, err
|
||||
}
|
||||
|
||||
return mathCommand.Execute(ctx, now, vars)
|
||||
return mathCommand.Execute(ctx, now, vars, tracer)
|
||||
}
|
||||
|
||||
// createMathExpression converts all the info we have about a "threshold" expression in to a Math expression
|
||||
|
@ -70,6 +70,7 @@ func (s *Service) TransformData(ctx context.Context, now time.Time, req *Request
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
ctx, span := s.tracer.Start(ctx, "SSE.TransformData")
|
||||
defer func() {
|
||||
var respStatus string
|
||||
switch {
|
||||
@ -80,6 +81,8 @@ func (s *Service) TransformData(ctx context.Context, now time.Time, req *Request
|
||||
}
|
||||
duration := float64(time.Since(start).Nanoseconds()) / float64(time.Millisecond)
|
||||
s.metrics.expressionsQuerySummary.WithLabelValues(respStatus).Observe(duration)
|
||||
|
||||
span.End()
|
||||
}()
|
||||
|
||||
// Build the pipeline from the request, checking for ordering issues (e.g. loops)
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
"github.com/grafana/grafana/pkg/services/datasources"
|
||||
fakes "github.com/grafana/grafana/pkg/services/datasources/fakes"
|
||||
@ -533,7 +534,7 @@ func TestValidate(t *testing.T) {
|
||||
pluginsStore: store,
|
||||
})
|
||||
|
||||
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}, nil), store)
|
||||
evaluator := NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, cacheService, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}, nil, tracing.InitializeTracerForTest()), store)
|
||||
evalCtx := NewContext(context.Background(), u)
|
||||
|
||||
err := evaluator.Validate(evalCtx, condition)
|
||||
|
@ -781,7 +781,7 @@ func setupScheduler(t *testing.T, rs *fakeRulesStore, is *state.FakeInstanceStor
|
||||
|
||||
var evaluator = evalMock
|
||||
if evalMock == nil {
|
||||
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, nil, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}, nil), &plugins.FakePluginStore{})
|
||||
evaluator = eval.NewEvaluatorFactory(setting.UnifiedAlertingSettings{}, nil, expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, nil, nil, &featuremgmt.FeatureManager{}, nil, tracing.InitializeTracerForTest()), &plugins.FakePluginStore{})
|
||||
}
|
||||
|
||||
if registry == nil {
|
||||
|
@ -17,6 +17,7 @@ import (
|
||||
"github.com/grafana/grafana/pkg/expr"
|
||||
"github.com/grafana/grafana/pkg/infra/db"
|
||||
"github.com/grafana/grafana/pkg/infra/log"
|
||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||
"github.com/grafana/grafana/pkg/models/roletype"
|
||||
"github.com/grafana/grafana/pkg/plugins"
|
||||
acmock "github.com/grafana/grafana/pkg/services/accesscontrol/mock"
|
||||
@ -446,7 +447,7 @@ func setup(t *testing.T) *testContext {
|
||||
DataSources: nil,
|
||||
SimulatePluginFailure: false,
|
||||
}
|
||||
exprService := expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, pc, fakeDatasourceService, &featuremgmt.FeatureManager{}, nil)
|
||||
exprService := expr.ProvideService(&setting.Cfg{ExpressionsEnabled: true}, pc, fakeDatasourceService, &featuremgmt.FeatureManager{}, nil, tracing.InitializeTracerForTest())
|
||||
queryService := ProvideService(setting.NewCfg(), dc, exprService, rv, ds, pc) // provider belonging to this package
|
||||
return &testContext{
|
||||
pluginContext: pc,
|
||||
|
Loading…
Reference in New Issue
Block a user