grafana/pkg/expr/nodes.go
Kyle Brandt 82aa000e9d
SSE: (Chore) Update log line with context (trace) (#89172)
Improves log line to help with debugging in Server Side Expressions. In particular, the traceId, datasourceType, and datasourceUid will now be included.
2024-06-13 10:58:39 -04:00

410 lines
12 KiB
Go

package expr
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter"
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"gonum.org/v1/gonum/graph/simple"
"github.com/grafana/grafana/pkg/expr/classic"
"github.com/grafana/grafana/pkg/expr/mathexp"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/featuremgmt"
)
// label that is used when all mathexp.Series have 0 labels to make them identifiable by labels. The value of this label is extracted from value field names
const nameLabelName = "__name__"
var (
logger = log.New("expr")
)
// baseNode includes common properties used across DPNodes.
type baseNode struct {
id int64
refID string
}
type rawNode struct {
RefID string `json:"refId"`
Query map[string]any
QueryRaw []byte
QueryType string
TimeRange TimeRange
DataSource *datasources.DataSource
// We use this index as the id of the node graph so the order can remain during a the stable sort of the dependency graph execution order.
// Some data sources, such as cloud watch, have order dependencies between queries.
idx int64
}
func getExpressionCommandTypeString(rawQuery map[string]any) (string, error) {
rawType, ok := rawQuery["type"]
if !ok {
return "", errors.New("no expression command type in query")
}
typeString, ok := rawType.(string)
if !ok {
return "", fmt.Errorf("expected expression command type to be a string, got type %T", rawType)
}
return typeString, nil
}
func GetExpressionCommandType(rawQuery map[string]any) (c CommandType, err error) {
typeString, err := getExpressionCommandTypeString(rawQuery)
if err != nil {
return c, err
}
return ParseCommandType(typeString)
}
// String returns a string representation of the node. In particular for
// %v formatting in error messages.
func (b *baseNode) String() string {
return b.refID
}
// CMDNode is a DPNode that holds an expression command.
type CMDNode struct {
baseNode
CMDType CommandType
Command Command
}
// ID returns the id of the node so it can fulfill the gonum's graph Node interface.
func (b *baseNode) ID() int64 {
return b.id
}
// RefID returns the refId of the node.
func (b *baseNode) RefID() string {
return b.refID
}
// NodeType returns the data pipeline node type.
func (gn *CMDNode) NodeType() NodeType {
return TypeCMDNode
}
func (gn *CMDNode) NeedsVars() []string {
return gn.Command.NeedsVars()
}
// Execute runs the node and adds the results to vars. If the node requires
// other nodes they must have already been executed and their results must
// already by in vars.
func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) {
return gn.Command.Execute(ctx, now, vars, s.tracer)
}
func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles) (*CMDNode, error) {
commandType, err := GetExpressionCommandType(rn.Query)
if err != nil {
return nil, fmt.Errorf("invalid command type in expression '%v': %w", rn.RefID, err)
}
node := &CMDNode{
baseNode: baseNode{
id: rn.idx,
refID: rn.RefID,
},
CMDType: commandType,
}
if toggles.IsEnabledGlobally(featuremgmt.FlagExpressionParser) {
rn.QueryType, err = getExpressionCommandTypeString(rn.Query)
if err != nil {
return nil, err // should not happen because the command was parsed first thing
}
// NOTE: this structure of this is weird now, because it is targeting a structure
// where this is actually run in the root loop, however we want to verify the individual
// node parsing before changing the full tree parser
reader := NewExpressionQueryReader(toggles)
iter, err := jsoniter.ParseBytes(jsoniter.ConfigDefault, rn.QueryRaw)
if err != nil {
return nil, err
}
q, err := reader.ReadQuery(data.NewDataQuery(map[string]any{
"refId": rn.RefID,
"type": rn.QueryType,
}), iter)
if err != nil {
return nil, err
}
node.Command = q.Command
return node, err
}
switch commandType {
case TypeMath:
node.Command, err = UnmarshalMathCommand(rn)
case TypeReduce:
node.Command, err = UnmarshalReduceCommand(rn)
case TypeResample:
node.Command, err = UnmarshalResampleCommand(rn)
case TypeClassicConditions:
node.Command, err = classic.UnmarshalConditionsCmd(rn.Query, rn.RefID)
case TypeThreshold:
node.Command, err = UnmarshalThresholdCommand(rn, toggles)
case TypeSQL:
node.Command, err = UnmarshalSQLCommand(rn)
default:
return nil, fmt.Errorf("expression command type '%v' in expression '%v' not implemented", commandType, rn.RefID)
}
if err != nil {
return nil, fmt.Errorf("failed to parse expression '%v': %w", rn.RefID, err)
}
return node, nil
}
const (
defaultIntervalMS = int64(64)
defaultMaxDP = int64(5000)
)
// DSNode is a DPNode that holds a datasource request.
type DSNode struct {
baseNode
query json.RawMessage
datasource *datasources.DataSource
orgID int64
queryType string
timeRange TimeRange
intervalMS int64
maxDP int64
request Request
}
func (dn *DSNode) String() string {
if dn.datasource == nil {
return "unknown"
}
return dn.datasource.Type
}
// NodeType returns the data pipeline node type.
func (dn *DSNode) NodeType() NodeType {
return TypeDatasourceNode
}
// NodeType returns the data pipeline node type.
func (dn *DSNode) NeedsVars() []string {
return []string{}
}
func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (*DSNode, error) {
if rn.TimeRange == nil {
return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID)
}
encodedQuery, err := json.Marshal(rn.Query)
if err != nil {
return nil, err
}
dsNode := &DSNode{
baseNode: baseNode{
id: rn.idx,
refID: rn.RefID,
},
orgID: req.OrgId,
query: json.RawMessage(encodedQuery),
queryType: rn.QueryType,
intervalMS: defaultIntervalMS,
maxDP: defaultMaxDP,
timeRange: rn.TimeRange,
request: *req,
datasource: rn.DataSource,
}
var floatIntervalMS float64
if rawIntervalMS, ok := rn.Query["intervalMs"]; ok {
if floatIntervalMS, ok = rawIntervalMS.(float64); !ok {
return nil, fmt.Errorf("expected intervalMs to be an float64, got type %T for refId %v", rawIntervalMS, rn.RefID)
}
dsNode.intervalMS = int64(floatIntervalMS)
}
var floatMaxDP float64
if rawMaxDP, ok := rn.Query["maxDataPoints"]; ok {
if floatMaxDP, ok = rawMaxDP.(float64); !ok {
return nil, fmt.Errorf("expected maxDataPoints to be an float64, got type %T for refId %v", rawMaxDP, rn.RefID)
}
dsNode.maxDP = int64(floatMaxDP)
}
return dsNode, nil
}
// executeDSNodesGrouped groups datasource node queries by the datasource instance, and then sends them
// in a single request with one or more queries to the datasource.
func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service, nodes []*DSNode) {
type dsKey struct {
uid string // in theory I think this all I need for the key, but rather be safe
id int64
orgID int64
}
byDS := make(map[dsKey][]*DSNode)
for _, node := range nodes {
k := dsKey{id: node.datasource.ID, uid: node.datasource.UID, orgID: node.orgID}
byDS[k] = append(byDS[k], node)
}
for _, nodeGroup := range byDS {
func() {
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
defer span.End()
firstNode := nodeGroup[0]
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, firstNode.datasource.Type, firstNode.request.User, firstNode.datasource)
if err != nil {
for _, dn := range nodeGroup {
vars[dn.refID] = mathexp.Results{Error: datasources.ErrDataSourceNotFound}
}
return
}
logger := logger.FromContext(ctx).New("datasourceType", firstNode.datasource.Type,
"queryRefId", firstNode.refID,
"datasourceUid", firstNode.datasource.UID,
"datasourceVersion", firstNode.datasource.Version,
)
span.SetAttributes(
attribute.String("datasource.type", firstNode.datasource.Type),
attribute.String("datasource.uid", firstNode.datasource.UID),
)
req := &backend.QueryDataRequest{
PluginContext: pCtx,
Headers: firstNode.request.Headers,
}
for _, dn := range nodeGroup {
req.Queries = append(req.Queries, backend.DataQuery{
RefID: dn.refID,
MaxDataPoints: dn.maxDP,
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
JSON: dn.query,
TimeRange: dn.timeRange.AbsoluteTime(now),
QueryType: dn.queryType,
})
}
instrument := func(e error, rt string) {
respStatus := "success"
responseType := rt
if e != nil {
responseType = "error"
respStatus = "failure"
span.SetStatus(codes.Error, "failed to query data source")
span.RecordError(e)
}
logger.Debug("Data source queried", "responseType", responseType)
useDataplane := strings.HasPrefix(responseType, "dataplane-")
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc()
}
resp, err := s.dataService.QueryData(ctx, req)
if err != nil {
for _, dn := range nodeGroup {
vars[dn.refID] = mathexp.Results{Error: MakeQueryError(firstNode.refID, firstNode.datasource.UID, err)}
}
instrument(err, "")
return
}
for _, dn := range nodeGroup {
dataFrames, err := getResponseFrame(logger, resp, dn.refID)
if err != nil {
vars[dn.refID] = mathexp.Results{Error: MakeQueryError(dn.refID, dn.datasource.UID, err)}
instrument(err, "")
return
}
var result mathexp.Results
responseType, result, err := s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames)
if err != nil {
result.Error = makeConversionError(dn.RefID(), err)
}
instrument(err, responseType)
vars[dn.refID] = result
}
}()
}
}
// Execute runs the node and adds the results to vars. If the node requires
// other nodes they must have already been executed and their results must
// already by in vars.
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version)
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
defer span.End()
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, dn.datasource.Type, dn.request.User, dn.datasource)
if err != nil {
return mathexp.Results{}, err
}
span.SetAttributes(
attribute.String("datasource.type", dn.datasource.Type),
attribute.String("datasource.uid", dn.datasource.UID),
)
req := &backend.QueryDataRequest{
PluginContext: pCtx,
Queries: []backend.DataQuery{
{
RefID: dn.refID,
MaxDataPoints: dn.maxDP,
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
JSON: dn.query,
TimeRange: dn.timeRange.AbsoluteTime(now),
QueryType: dn.queryType,
},
},
Headers: dn.request.Headers,
}
responseType := "unknown"
respStatus := "success"
defer func() {
if e != nil {
responseType = "error"
respStatus = "failure"
span.SetStatus(codes.Error, "failed to query data source")
span.RecordError(e)
}
logger.Debug("Data source queried", "responseType", responseType)
useDataplane := strings.HasPrefix(responseType, "dataplane-")
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
}()
resp, err := s.dataService.QueryData(ctx, req)
if err != nil {
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
}
dataFrames, err := getResponseFrame(logger, resp, dn.refID)
if err != nil {
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
}
var result mathexp.Results
responseType, result, err = s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames)
if err != nil {
err = makeConversionError(dn.refID, err)
}
return result, err
}