mirror of
https://github.com/grafana/grafana.git
synced 2025-02-09 23:16:16 -06:00
410 lines
12 KiB
Go
410 lines
12 KiB
Go
package expr
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
|
"github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter"
|
|
data "github.com/grafana/grafana-plugin-sdk-go/experimental/apis/data/v0alpha1"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"gonum.org/v1/gonum/graph/simple"
|
|
|
|
"github.com/grafana/grafana/pkg/expr/classic"
|
|
"github.com/grafana/grafana/pkg/expr/mathexp"
|
|
"github.com/grafana/grafana/pkg/infra/log"
|
|
"github.com/grafana/grafana/pkg/services/datasources"
|
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
|
)
|
|
|
|
// label that is used when all mathexp.Series have 0 labels to make them identifiable by labels. The value of this label is extracted from value field names
|
|
const nameLabelName = "__name__"
|
|
|
|
var (
|
|
logger = log.New("expr")
|
|
)
|
|
|
|
// baseNode includes common properties used across DPNodes.
|
|
type baseNode struct {
|
|
id int64
|
|
refID string
|
|
}
|
|
|
|
type rawNode struct {
|
|
RefID string `json:"refId"`
|
|
Query map[string]any
|
|
QueryRaw []byte
|
|
QueryType string
|
|
TimeRange TimeRange
|
|
DataSource *datasources.DataSource
|
|
// We use this index as the id of the node graph so the order can remain during a the stable sort of the dependency graph execution order.
|
|
// Some data sources, such as cloud watch, have order dependencies between queries.
|
|
idx int64
|
|
}
|
|
|
|
func getExpressionCommandTypeString(rawQuery map[string]any) (string, error) {
|
|
rawType, ok := rawQuery["type"]
|
|
if !ok {
|
|
return "", errors.New("no expression command type in query")
|
|
}
|
|
typeString, ok := rawType.(string)
|
|
if !ok {
|
|
return "", fmt.Errorf("expected expression command type to be a string, got type %T", rawType)
|
|
}
|
|
return typeString, nil
|
|
}
|
|
|
|
func GetExpressionCommandType(rawQuery map[string]any) (c CommandType, err error) {
|
|
typeString, err := getExpressionCommandTypeString(rawQuery)
|
|
if err != nil {
|
|
return c, err
|
|
}
|
|
return ParseCommandType(typeString)
|
|
}
|
|
|
|
// String returns a string representation of the node. In particular for
|
|
// %v formatting in error messages.
|
|
func (b *baseNode) String() string {
|
|
return b.refID
|
|
}
|
|
|
|
// CMDNode is a DPNode that holds an expression command.
|
|
type CMDNode struct {
|
|
baseNode
|
|
CMDType CommandType
|
|
Command Command
|
|
}
|
|
|
|
// ID returns the id of the node so it can fulfill the gonum's graph Node interface.
|
|
func (b *baseNode) ID() int64 {
|
|
return b.id
|
|
}
|
|
|
|
// RefID returns the refId of the node.
|
|
func (b *baseNode) RefID() string {
|
|
return b.refID
|
|
}
|
|
|
|
// NodeType returns the data pipeline node type.
|
|
func (gn *CMDNode) NodeType() NodeType {
|
|
return TypeCMDNode
|
|
}
|
|
|
|
func (gn *CMDNode) NeedsVars() []string {
|
|
return gn.Command.NeedsVars()
|
|
}
|
|
|
|
// Execute runs the node and adds the results to vars. If the node requires
|
|
// other nodes they must have already been executed and their results must
|
|
// already by in vars.
|
|
func (gn *CMDNode) Execute(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service) (mathexp.Results, error) {
|
|
return gn.Command.Execute(ctx, now, vars, s.tracer)
|
|
}
|
|
|
|
func buildCMDNode(rn *rawNode, toggles featuremgmt.FeatureToggles) (*CMDNode, error) {
|
|
commandType, err := GetExpressionCommandType(rn.Query)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid command type in expression '%v': %w", rn.RefID, err)
|
|
}
|
|
|
|
node := &CMDNode{
|
|
baseNode: baseNode{
|
|
id: rn.idx,
|
|
refID: rn.RefID,
|
|
},
|
|
CMDType: commandType,
|
|
}
|
|
|
|
if toggles.IsEnabledGlobally(featuremgmt.FlagExpressionParser) {
|
|
rn.QueryType, err = getExpressionCommandTypeString(rn.Query)
|
|
if err != nil {
|
|
return nil, err // should not happen because the command was parsed first thing
|
|
}
|
|
|
|
// NOTE: this structure of this is weird now, because it is targeting a structure
|
|
// where this is actually run in the root loop, however we want to verify the individual
|
|
// node parsing before changing the full tree parser
|
|
reader := NewExpressionQueryReader(toggles)
|
|
iter, err := jsoniter.ParseBytes(jsoniter.ConfigDefault, rn.QueryRaw)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
q, err := reader.ReadQuery(data.NewDataQuery(map[string]any{
|
|
"refId": rn.RefID,
|
|
"type": rn.QueryType,
|
|
}), iter)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
node.Command = q.Command
|
|
return node, err
|
|
}
|
|
|
|
switch commandType {
|
|
case TypeMath:
|
|
node.Command, err = UnmarshalMathCommand(rn)
|
|
case TypeReduce:
|
|
node.Command, err = UnmarshalReduceCommand(rn)
|
|
case TypeResample:
|
|
node.Command, err = UnmarshalResampleCommand(rn)
|
|
case TypeClassicConditions:
|
|
node.Command, err = classic.UnmarshalConditionsCmd(rn.Query, rn.RefID)
|
|
case TypeThreshold:
|
|
node.Command, err = UnmarshalThresholdCommand(rn, toggles)
|
|
case TypeSQL:
|
|
node.Command, err = UnmarshalSQLCommand(rn)
|
|
default:
|
|
return nil, fmt.Errorf("expression command type '%v' in expression '%v' not implemented", commandType, rn.RefID)
|
|
}
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to parse expression '%v': %w", rn.RefID, err)
|
|
}
|
|
|
|
return node, nil
|
|
}
|
|
|
|
const (
|
|
defaultIntervalMS = int64(64)
|
|
defaultMaxDP = int64(5000)
|
|
)
|
|
|
|
// DSNode is a DPNode that holds a datasource request.
|
|
type DSNode struct {
|
|
baseNode
|
|
query json.RawMessage
|
|
datasource *datasources.DataSource
|
|
|
|
orgID int64
|
|
queryType string
|
|
timeRange TimeRange
|
|
intervalMS int64
|
|
maxDP int64
|
|
request Request
|
|
}
|
|
|
|
func (dn *DSNode) String() string {
|
|
if dn.datasource == nil {
|
|
return "unknown"
|
|
}
|
|
return dn.datasource.Type
|
|
}
|
|
|
|
// NodeType returns the data pipeline node type.
|
|
func (dn *DSNode) NodeType() NodeType {
|
|
return TypeDatasourceNode
|
|
}
|
|
|
|
// NodeType returns the data pipeline node type.
|
|
func (dn *DSNode) NeedsVars() []string {
|
|
return []string{}
|
|
}
|
|
|
|
func (s *Service) buildDSNode(dp *simple.DirectedGraph, rn *rawNode, req *Request) (*DSNode, error) {
|
|
if rn.TimeRange == nil {
|
|
return nil, fmt.Errorf("time range must be specified for refID %s", rn.RefID)
|
|
}
|
|
encodedQuery, err := json.Marshal(rn.Query)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dsNode := &DSNode{
|
|
baseNode: baseNode{
|
|
id: rn.idx,
|
|
refID: rn.RefID,
|
|
},
|
|
orgID: req.OrgId,
|
|
query: json.RawMessage(encodedQuery),
|
|
queryType: rn.QueryType,
|
|
intervalMS: defaultIntervalMS,
|
|
maxDP: defaultMaxDP,
|
|
timeRange: rn.TimeRange,
|
|
request: *req,
|
|
datasource: rn.DataSource,
|
|
}
|
|
|
|
var floatIntervalMS float64
|
|
if rawIntervalMS, ok := rn.Query["intervalMs"]; ok {
|
|
if floatIntervalMS, ok = rawIntervalMS.(float64); !ok {
|
|
return nil, fmt.Errorf("expected intervalMs to be an float64, got type %T for refId %v", rawIntervalMS, rn.RefID)
|
|
}
|
|
dsNode.intervalMS = int64(floatIntervalMS)
|
|
}
|
|
|
|
var floatMaxDP float64
|
|
if rawMaxDP, ok := rn.Query["maxDataPoints"]; ok {
|
|
if floatMaxDP, ok = rawMaxDP.(float64); !ok {
|
|
return nil, fmt.Errorf("expected maxDataPoints to be an float64, got type %T for refId %v", rawMaxDP, rn.RefID)
|
|
}
|
|
dsNode.maxDP = int64(floatMaxDP)
|
|
}
|
|
|
|
return dsNode, nil
|
|
}
|
|
|
|
// executeDSNodesGrouped groups datasource node queries by the datasource instance, and then sends them
|
|
// in a single request with one or more queries to the datasource.
|
|
func executeDSNodesGrouped(ctx context.Context, now time.Time, vars mathexp.Vars, s *Service, nodes []*DSNode) {
|
|
type dsKey struct {
|
|
uid string // in theory I think this all I need for the key, but rather be safe
|
|
id int64
|
|
orgID int64
|
|
}
|
|
byDS := make(map[dsKey][]*DSNode)
|
|
for _, node := range nodes {
|
|
k := dsKey{id: node.datasource.ID, uid: node.datasource.UID, orgID: node.orgID}
|
|
byDS[k] = append(byDS[k], node)
|
|
}
|
|
|
|
for _, nodeGroup := range byDS {
|
|
func() {
|
|
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
|
|
defer span.End()
|
|
firstNode := nodeGroup[0]
|
|
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, firstNode.datasource.Type, firstNode.request.User, firstNode.datasource)
|
|
if err != nil {
|
|
for _, dn := range nodeGroup {
|
|
vars[dn.refID] = mathexp.Results{Error: datasources.ErrDataSourceNotFound}
|
|
}
|
|
return
|
|
}
|
|
|
|
logger := logger.FromContext(ctx).New("datasourceType", firstNode.datasource.Type,
|
|
"queryRefId", firstNode.refID,
|
|
"datasourceUid", firstNode.datasource.UID,
|
|
"datasourceVersion", firstNode.datasource.Version,
|
|
)
|
|
|
|
span.SetAttributes(
|
|
attribute.String("datasource.type", firstNode.datasource.Type),
|
|
attribute.String("datasource.uid", firstNode.datasource.UID),
|
|
)
|
|
|
|
req := &backend.QueryDataRequest{
|
|
PluginContext: pCtx,
|
|
Headers: firstNode.request.Headers,
|
|
}
|
|
|
|
for _, dn := range nodeGroup {
|
|
req.Queries = append(req.Queries, backend.DataQuery{
|
|
RefID: dn.refID,
|
|
MaxDataPoints: dn.maxDP,
|
|
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
|
|
JSON: dn.query,
|
|
TimeRange: dn.timeRange.AbsoluteTime(now),
|
|
QueryType: dn.queryType,
|
|
})
|
|
}
|
|
|
|
instrument := func(e error, rt string) {
|
|
respStatus := "success"
|
|
responseType := rt
|
|
if e != nil {
|
|
responseType = "error"
|
|
respStatus = "failure"
|
|
span.SetStatus(codes.Error, "failed to query data source")
|
|
span.RecordError(e)
|
|
}
|
|
logger.Debug("Data source queried", "responseType", responseType)
|
|
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
|
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), firstNode.datasource.Type).Inc()
|
|
}
|
|
|
|
resp, err := s.dataService.QueryData(ctx, req)
|
|
if err != nil {
|
|
for _, dn := range nodeGroup {
|
|
vars[dn.refID] = mathexp.Results{Error: MakeQueryError(firstNode.refID, firstNode.datasource.UID, err)}
|
|
}
|
|
instrument(err, "")
|
|
return
|
|
}
|
|
|
|
for _, dn := range nodeGroup {
|
|
dataFrames, err := getResponseFrame(resp, dn.refID)
|
|
if err != nil {
|
|
vars[dn.refID] = mathexp.Results{Error: MakeQueryError(dn.refID, dn.datasource.UID, err)}
|
|
instrument(err, "")
|
|
return
|
|
}
|
|
|
|
var result mathexp.Results
|
|
responseType, result, err := s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames)
|
|
if err != nil {
|
|
result.Error = makeConversionError(dn.RefID(), err)
|
|
}
|
|
instrument(err, responseType)
|
|
vars[dn.refID] = result
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
// Execute runs the node and adds the results to vars. If the node requires
|
|
// other nodes they must have already been executed and their results must
|
|
// already by in vars.
|
|
func (dn *DSNode) Execute(ctx context.Context, now time.Time, _ mathexp.Vars, s *Service) (r mathexp.Results, e error) {
|
|
logger := logger.FromContext(ctx).New("datasourceType", dn.datasource.Type, "queryRefId", dn.refID, "datasourceUid", dn.datasource.UID, "datasourceVersion", dn.datasource.Version)
|
|
ctx, span := s.tracer.Start(ctx, "SSE.ExecuteDatasourceQuery")
|
|
defer span.End()
|
|
|
|
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, dn.datasource.Type, dn.request.User, dn.datasource)
|
|
if err != nil {
|
|
return mathexp.Results{}, err
|
|
}
|
|
span.SetAttributes(
|
|
attribute.String("datasource.type", dn.datasource.Type),
|
|
attribute.String("datasource.uid", dn.datasource.UID),
|
|
)
|
|
|
|
req := &backend.QueryDataRequest{
|
|
PluginContext: pCtx,
|
|
Queries: []backend.DataQuery{
|
|
{
|
|
RefID: dn.refID,
|
|
MaxDataPoints: dn.maxDP,
|
|
Interval: time.Duration(int64(time.Millisecond) * dn.intervalMS),
|
|
JSON: dn.query,
|
|
TimeRange: dn.timeRange.AbsoluteTime(now),
|
|
QueryType: dn.queryType,
|
|
},
|
|
},
|
|
Headers: dn.request.Headers,
|
|
}
|
|
|
|
responseType := "unknown"
|
|
respStatus := "success"
|
|
defer func() {
|
|
if e != nil {
|
|
responseType = "error"
|
|
respStatus = "failure"
|
|
span.SetStatus(codes.Error, "failed to query data source")
|
|
span.RecordError(e)
|
|
}
|
|
logger.Debug("Data source queried", "responseType", responseType)
|
|
useDataplane := strings.HasPrefix(responseType, "dataplane-")
|
|
s.metrics.dsRequests.WithLabelValues(respStatus, fmt.Sprintf("%t", useDataplane), dn.datasource.Type).Inc()
|
|
}()
|
|
|
|
resp, err := s.dataService.QueryData(ctx, req)
|
|
if err != nil {
|
|
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
|
|
}
|
|
|
|
dataFrames, err := getResponseFrame(resp, dn.refID)
|
|
if err != nil {
|
|
return mathexp.Results{}, MakeQueryError(dn.refID, dn.datasource.UID, err)
|
|
}
|
|
|
|
var result mathexp.Results
|
|
responseType, result, err = s.converter.Convert(ctx, dn.datasource.Type, dataFrames, s.allowLongFrames)
|
|
if err != nil {
|
|
err = makeConversionError(dn.refID, err)
|
|
}
|
|
return result, err
|
|
}
|