mirror of
https://github.com/grafana/grafana.git
synced 2025-01-02 12:17:01 -06:00
82aa000e9d
Improves log line to help with debugging in Server Side Expressions. In particular, the traceId, datasourceType, and datasourceUid will now be included.
410 lines
12 KiB
Go
410 lines
12 KiB
Go
package expr
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter"
|
|
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"gonum.org/v1/gonum/graph/simple"
|
|
|
|
"github.com/grafana/grafana/pkg/expr/classic"
|
|
"github.com/grafana/grafana/pkg/expr/mathexp"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/services/datasources"
|
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
|
)
|
|
|
|
// label that is used when all mathexp.Series have 0 labels to make them identifiable by labels. The value of this label is extracted from value field names
|
|
const nameLabelName = "__name__"
|
|
|
|
var (
|
|
logger = log.New("expr")
|
|
)
|
|
|
|
// baseNode includes common properties used across DPNodes.
|
|
type baseNode struct {
|
|
id int64
|
|
refID string
|
|
}
|
|
|
|
type rawNode struct {
|
|
RefID string `json:"refId"`
|
|
Query map[string]any
|
|
QueryRaw []byte
|
|
QueryType string
|
|
TimeRange TimeRange
|
|
DataSource *datasources.DataSource
|
|
// We use this index as the id of the node graph so the order can remain during a the stable sort of the dependency graph execution order.
|
|
// Some data sources, such as cloud watch, have order dependencies between queries.
|
|
idx int64
|
|
}
|
|
|
|
func getExpressionCommandTypeString(rawQuery map[string]any) (string, error) {
|
|
rawType, ok := rawQuery["type"]
|
|
if !ok {
|
|
return "", errors.New("no expression command type in query")
|
|
}
|
|
typeString, ok := rawType.(string)
|
|
if !ok {
|
|
return "", fmt.Errorf("expected expression command type to be a string, got type %T", rawType)
|
|
}
|
|
return typeString, nil
|
|
}
|
|
|
|
func GetExpressionCommandType(rawQuery map[string]any) (c CommandType, err error) {
|
|
typeString, err := getExpressionCommandTypeString(rawQuery)
|
|
if err != nil {
|
|
return c, err
|
|
}
|
|
return ParseCommandType(typeString)
|
|
}
|
|
|
|
// String returns a string representation of the node. In particular for
|
|
// %v formatting in error messages.
|
|
func (b *baseNode) String() string {
|
|
return b.refID
|
|
}
|
|
|
|
// CMDNode is a DPNode that holds an expression command.
|
|
type CMDNode struct {
|
|
baseNode
|
|
CMDType CommandType
|
|
Command Command
|
|
}
|
|
|
|
// ID returns the id of the node so it can fulfill the gonum's graph Node interface.
|
|
func (b *baseNode) ID() int64 {
|
|
return b.id
|
|
}
|
|
|
|
// RefID returns the refId of the node.
|
|
func (b *baseNode) RefID() string {
|
|
return b.refID
|
|
}
|
|
|
|
// NodeType returns the data pipeline node type.
|
|
func (gn *CMDNode) NodeType() NodeType {
|
|
return TypeCMDNode
|
|
}
|
|
|
|
func (gn *CMDNode) NeedsVars() []string {
|
|
return gn.Command.NeedsVars()
|
|
}
|
|
|
|
// Execute runs the node and adds the results to vars. If the node requires
|
|
// other nodes they must have already been executed and their results must
|
|
// already by in vars.
|
|
func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) {
|
|
return gn.Command.Execute(ctx, now, vars, s.tracer)
|
|
}
|
|
|
|
func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles) (*CMDNode, error) {
|
|
commandType, err := GetExpressionCommandType(rn.Query)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid command type in expression '%v': %w", rn.RefID, err)
|
|
}
|
|
|
|
node := &CMDNode{
|
|
baseNode: baseNode{
|
|
id: rn.idx,
|
|
refID: rn.RefID,
|
|
},
|
|
CMDType: commandType,
|
|
}
|
|
|
|
if toggles.IsEnabledGlobally(featuremgmt.FlagExpressionParser) {
|
|
rn.QueryType, err = getExpressionCommandTypeString(rn.Query)
|
|
if err != nil {
|
|
return nil, err // should not happen because the command was parsed first thing
|
|
}
|
|
|
|
// NOTE: this structure of this is weird now, because it is targeting a structure
|
|
// where this is actually run in the root loop, however we want to verify the individual
|
|
// node parsing before changing the full tree parser
|
|
reader := NewExpressionQueryReader(toggles)
|
|
iter, err := jsoniter.ParseBytes(jsoniter.ConfigDefault, rn.QueryRaw)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
q, err := reader.ReadQuery(data.NewDataQuery(map[string]any{
|
|
"refId": rn.RefID,
|
|
"type": rn.QueryType,
|
|
}), iter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
node.Command = q.Command
|
|
return node, err
|
|
}
|
|
|
|
switch commandType {
|
|
case TypeMath:
|
|
node.Command, err = UnmarshalMathCommand(rn)
|
|
case TypeReduce:
|
|
node.Command, err = UnmarshalReduceCommand(rn)
|
|
case TypeResample:
|
|
node.Command, err = UnmarshalResampleCommand(rn)
|
|
case TypeClassicConditions:
|
|
node.Command, err = classic.UnmarshalConditionsCmd(rn.Query, rn.RefID)
|
|
case TypeThreshold:
|
|
node.Command, err = UnmarshalThresholdCommand(rn, toggles)
|
|
case TypeSQL:
|
|
node.Command, err = UnmarshalSQLCommand(rn)
|
|
default:
|
|
return nil, fmt.Errorf("expression command type '%v' in expression '%v' not implemented", commandType, rn.RefID)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse expression '%v': %w", rn.RefID, err)
|
|
}
|
|
|
|
return node, nil
|
|
}
|
|
|
|
const (
|
|
defaultIntervalMS = int64(64)
|
|
defaultMaxDP = int64(5000)
|
|
)
|
|
|
|
// DSNode is a DPNode that holds a datasource request.
|
|
type DSNode struct {
|
|
baseNode
|
|
query json.RawMessage
|
|
datasource *datasources.DataSource
|
|
|
|
orgID int64
|
|
queryType string
|
|
timeRange TimeRange
|
|
intervalMS int64
|
|
maxDP int64
|
|
request Request
|
|
}
|
|
|
|
func (dn *DSNode) String() string {
|
|
if dn.datasource == nil {
|
|
return "unknown"
|
|
}
|
|
return dn.datasource.Type
|
|
}
|
|
|
|
// NodeType returns the data pipeline node type.
|
|
func (dn *DSNode) NodeType() NodeType {
|
|
return TypeDatasourceNode
|
|
}
|
|
|
|
// NodeType returns the data pipeline node type.
|
|
func (dn *DSNode) NeedsVars() []string {
|
|
return []string{}
|
|
}
|
|
|
|
func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (*DSNode, error) {
|
|
if rn.TimeRange == nil {
|
|
return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID)
|
|
}
|
|
encodedQuery, err := json.Marshal(rn.Query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dsNode := &DSNode{
|
|
baseNode: baseNode{
|
|
id: rn.idx,
|
|
refID: rn.RefID,
|
|
},
|
|
orgID: req.OrgId,
|
|
query: json.RawMessage(encodedQuery),
|
|
queryType: rn.QueryType,
|
|
intervalMS: defaultIntervalMS,
|
|
maxDP: defaultMaxDP,
|
|
timeRange: rn.TimeRange,
|
|
request: *req,
|
|
datasource: rn.DataSource,
|
|
}
|
|
|
|
var floatIntervalMS float64
|
|
if rawIntervalMS, ok := rn.Query["intervalMs"]; ok {
|
|
if floatIntervalMS, ok = rawIntervalMS.(float64); !ok {
|
|
return nil, fmt.Errorf("expected intervalMs to be an float64, got type %T for refId %v", rawIntervalMS, rn.RefID)
|
|
}
|
|
dsNode.intervalMS = int64(floatIntervalMS)
|
|
}
|
|
|
|
var floatMaxDP float64
|
|
if rawMaxDP, ok := rn.Query["maxDataPoints"]; ok {
|
|
if floatMaxDP, ok = rawMaxDP.(float64); !ok {
|
|
return nil, fmt.Errorf("expected maxDataPoints to be an float64, got type %T for refId %v", rawMaxDP, rn.RefID)
|
|
}
|
|
dsNode.maxDP = int64(floatMaxDP)
|
|
}
|
|
|
|
return dsNode, nil
|
|
}
|
|
|
|
// executeDSNodesGrouped groups datasource node queries by the datasource instance, and then sends them
|
|
// in a single request with one or more queries to the datasource.
|
|
func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service, nodes []*DSNode) {
|
|
type dsKey struct {
|
|
uid string // in theory I think this all I need for the key, but rather be safe
|
|
id int64
|
|
orgID int64
|
|
}
|
|
byDS := make(map[dsKey][]*DSNode)
|
|
for _, node := range nodes {
|
|
k := dsKey{id: node.datasource.ID, uid: node.datasource.UID, orgID: node.orgID}
|
|
byDS[k] = append(byDS[k], node)
|
|
}
|
|
|
|
for _, nodeGroup := range byDS {
|
|
func() {
|
|
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
|
|
defer span.End()
|
|
firstNode := nodeGroup[0]
|
|
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, firstNode.datasource.Type, firstNode.request.User, firstNode.datasource)
|
|
if err != nil {
|
|
for _, dn := range nodeGroup {
|
|
vars[dn.refID] = mathexp.Results{Error: datasources.ErrDataSourceNotFound}
|
|
}
|
|
return
|
|
}
|
|
|
|
logger := logger.FromContext(ctx).New("datasourceType", firstNode.datasource.Type,
|
|
"queryRefId", firstNode.refID,
|
|
"datasourceUid", firstNode.datasource.UID,
|
|
"datasourceVersion", firstNode.datasource.Version,
|
|
)
|
|
|
|
span.SetAttributes(
|
|
attribute.String("datasource.type", firstNode.datasource.Type),
|
|
attribute.String("datasource.uid", firstNode.datasource.UID),
|
|
)
|
|
|
|
req := &backend.QueryDataRequest{
|
|
PluginContext: pCtx,
|
|
Headers: firstNode.request.Headers,
|
|
}
|
|
|
|
for _, dn := range nodeGroup {
|
|
req.Queries = append(req.Queries, backend.DataQuery{
|
|
RefID: dn.refID,
|
|
MaxDataPoints: dn.maxDP,
|
|
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
|
|
JSON: dn.query,
|
|
TimeRange: dn.timeRange.AbsoluteTime(now),
|
|
QueryType: dn.queryType,
|
|
})
|
|
}
|
|
|
|
instrument := func(e error, rt string) {
|
|
respStatus := "success"
|
|
responseType := rt
|
|
if e != nil {
|
|
responseType = "error"
|
|
respStatus = "failure"
|
|
span.SetStatus(codes.Error, "failed to query data source")
|
|
span.RecordError(e)
|
|
}
|
|
logger.Debug("Data source queried", "responseType", responseType)
|
|
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
|
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc()
|
|
}
|
|
|
|
resp, err := s.dataService.QueryData(ctx, req)
|
|
if err != nil {
|
|
for _, dn := range nodeGroup {
|
|
vars[dn.refID] = mathexp.Results{Error: MakeQueryError(firstNode.refID, firstNode.datasource.UID, err)}
|
|
}
|
|
instrument(err, "")
|
|
return
|
|
}
|
|
|
|
for _, dn := range nodeGroup {
|
|
dataFrames, err := getResponseFrame(logger, resp, dn.refID)
|
|
if err != nil {
|
|
vars[dn.refID] = mathexp.Results{Error: MakeQueryError(dn.refID, dn.datasource.UID, err)}
|
|
instrument(err, "")
|
|
return
|
|
}
|
|
|
|
var result mathexp.Results
|
|
responseType, result, err := s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames)
|
|
if err != nil {
|
|
result.Error = makeConversionError(dn.RefID(), err)
|
|
}
|
|
instrument(err, responseType)
|
|
vars[dn.refID] = result
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Execute runs the node and adds the results to vars. If the node requires
|
|
// other nodes they must have already been executed and their results must
|
|
// already by in vars.
|
|
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
|
|
logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version)
|
|
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
|
|
defer span.End()
|
|
|
|
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, dn.datasource.Type, dn.request.User, dn.datasource)
|
|
if err != nil {
|
|
return mathexp.Results{}, err
|
|
}
|
|
span.SetAttributes(
|
|
attribute.String("datasource.type", dn.datasource.Type),
|
|
attribute.String("datasource.uid", dn.datasource.UID),
|
|
)
|
|
|
|
req := &backend.QueryDataRequest{
|
|
PluginContext: pCtx,
|
|
Queries: []backend.DataQuery{
|
|
{
|
|
RefID: dn.refID,
|
|
MaxDataPoints: dn.maxDP,
|
|
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
|
|
JSON: dn.query,
|
|
TimeRange: dn.timeRange.AbsoluteTime(now),
|
|
QueryType: dn.queryType,
|
|
},
|
|
},
|
|
Headers: dn.request.Headers,
|
|
}
|
|
|
|
responseType := "unknown"
|
|
respStatus := "success"
|
|
defer func() {
|
|
if e != nil {
|
|
responseType = "error"
|
|
respStatus = "failure"
|
|
span.SetStatus(codes.Error, "failed to query data source")
|
|
span.RecordError(e)
|
|
}
|
|
logger.Debug("Data source queried", "responseType", responseType)
|
|
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
|
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
|
|
}()
|
|
|
|
resp, err := s.dataService.QueryData(ctx, req)
|
|
if err != nil {
|
|
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
|
|
}
|
|
|
|
dataFrames, err := getResponseFrame(logger, resp, dn.refID)
|
|
if err != nil {
|
|
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
|
|
}
|
|
|
|
var result mathexp.Results
|
|
responseType, result, err = s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames)
|
|
if err != nil {
|
|
err = makeConversionError(dn.refID, err)
|
|
}
|
|
return result, err
|
|
}
|