grafana/pkg/services/query/query.go
2024-02-21 09:38:42 +01:00

379 lines
12 KiB
Go

package query
import (
"context"
"fmt"
"net/http"
"runtime"
"slices"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"golang.org/x/sync/errgroup"
"github.com/grafana/grafana/pkg/api/dtos"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/expr"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/services/auth/identity"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/datasources"
"github.com/grafana/grafana/pkg/services/pluginsintegration/plugincontext"
"github.com/grafana/grafana/pkg/services/validations"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/grafanads"
"github.com/grafana/grafana/pkg/tsdb/legacydata"
"github.com/grafana/grafana/pkg/util/errutil"
)
const (
HeaderPluginID = "X-Plugin-Id" // can be used for routing
HeaderDatasourceUID = "X-Datasource-Uid" // can be used for routing/ load balancing
HeaderDashboardUID = "X-Dashboard-Uid" // mainly useful for debugging slow queries
HeaderPanelID = "X-Panel-Id" // mainly useful for debugging slow queries
HeaderPanelPluginId = "X-Panel-Plugin-Id"
HeaderQueryGroupID = "X-Query-Group-Id" // mainly useful for finding related queries with query chunking
HeaderFromExpression = "X-Grafana-From-Expr" // used by datasources to identify expression queries
)
func ProvideService(
cfg *setting.Cfg,
dataSourceCache datasources.CacheService,
expressionService *expr.Service,
pluginRequestValidator validations.PluginRequestValidator,
pluginClient plugins.Client,
pCtxProvider *plugincontext.Provider,
) *ServiceImpl {
g := &ServiceImpl{
cfg: cfg,
dataSourceCache: dataSourceCache,
expressionService: expressionService,
pluginRequestValidator: pluginRequestValidator,
pluginClient: pluginClient,
pCtxProvider: pCtxProvider,
log: log.New("query_data"),
concurrentQueryLimit: cfg.SectionWithEnvOverrides("query").Key("concurrent_query_limit").MustInt(runtime.NumCPU()),
}
g.log.Info("Query Service initialization")
return g
}
//go:generate mockery --name Service --structname FakeQueryService --inpackage --filename query_service_mock.go
type Service interface {
Run(ctx context.Context) error
QueryData(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error)
}
// Gives us compile time error if the service does not adhere to the contract of the interface
var _ Service = (*ServiceImpl)(nil)
type ServiceImpl struct {
cfg *setting.Cfg
dataSourceCache datasources.CacheService
expressionService *expr.Service
pluginRequestValidator validations.PluginRequestValidator
pluginClient plugins.Client
pCtxProvider *plugincontext.Provider
log log.Logger
concurrentQueryLimit int
}
// Run ServiceImpl.
func (s *ServiceImpl) Run(ctx context.Context) error {
<-ctx.Done()
return ctx.Err()
}
// QueryData processes queries and returns query responses. It handles queries to single or mixed datasources, as well as expressions.
func (s *ServiceImpl) QueryData(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*backend.QueryDataResponse, error) {
// Parse the request into parsed queries grouped by datasource uid
parsedReq, err := s.parseMetricRequest(ctx, user, skipDSCache, reqDTO)
if err != nil {
return nil, err
}
// If there are expressions, handle them and return
if parsedReq.hasExpression {
return s.handleExpressions(ctx, user, parsedReq)
}
// If there is only one datasource, query it and return
if len(parsedReq.parsedQueries) == 1 {
return s.handleQuerySingleDatasource(ctx, user, parsedReq)
}
// If there are multiple datasources, handle their queries concurrently and return the aggregate result
return s.executeConcurrentQueries(ctx, user, skipDSCache, reqDTO, parsedReq.parsedQueries)
}
// splitResponse contains the results of a concurrent data source query - the response and any headers
type splitResponse struct {
responses backend.Responses
header http.Header
}
// executeConcurrentQueries executes queries to multiple datasources concurrently and returns the aggregate result.
func (s *ServiceImpl) executeConcurrentQueries(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest, queriesbyDs map[string][]parsedQuery) (*backend.QueryDataResponse, error) {
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(s.concurrentQueryLimit) // prevent too many concurrent requests
rchan := make(chan splitResponse, len(queriesbyDs))
// Create panic recovery function for loop below
recoveryFn := func(queries []*simplejson.Json) {
if r := recover(); r != nil {
var err error
s.log.Error("query datasource panic", "error", r, "stack", log.Stack(1))
if theErr, ok := r.(error); ok {
err = theErr
} else if theErrString, ok := r.(string); ok {
err = fmt.Errorf(theErrString)
} else {
err = fmt.Errorf("unexpected error - %s", s.cfg.UserFacingDefaultError)
}
// Due to the panic, there is no valid response for any query for this datasource. Append an error for each one.
rchan <- buildErrorResponses(err, queries)
}
}
// Query each datasource concurrently
for _, queries := range queriesbyDs {
rawQueries := make([]*simplejson.Json, len(queries))
for i := 0; i < len(queries); i++ {
rawQueries[i] = queries[i].rawQuery
}
g.Go(func() error {
subDTO := reqDTO.CloneWithQueries(rawQueries)
// Handle panics in the datasource qery
defer recoveryFn(subDTO.Queries)
ctxCopy := contexthandler.CopyWithReqContext(ctx)
subResp, err := s.QueryData(ctxCopy, user, skipDSCache, subDTO)
if err == nil {
reqCtx, header := contexthandler.FromContext(ctxCopy), http.Header{}
if reqCtx != nil {
header = reqCtx.Resp.Header()
}
rchan <- splitResponse{subResp.Responses, header}
} else {
// If there was an error, return an error response for each query for this datasource
rchan <- buildErrorResponses(err, subDTO.Queries)
}
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
close(rchan)
resp := backend.NewQueryDataResponse()
reqCtx := contexthandler.FromContext(ctx)
for result := range rchan {
for refId, dataResponse := range result.responses {
resp.Responses[refId] = dataResponse
}
if reqCtx != nil {
for k, v := range result.header {
for _, val := range v {
if !slices.Contains(reqCtx.Resp.Header().Values(k), val) {
reqCtx.Resp.Header().Add(k, val)
} else {
s.log.Warn("skipped duplicate response header", "header", k, "value", val)
}
}
}
}
}
return resp, nil
}
// buildErrorResponses applies the provided error to each query response in the list. These queries should all belong to the same datasource.
func buildErrorResponses(err error, queries []*simplejson.Json) splitResponse {
er := backend.Responses{}
for _, query := range queries {
er[query.Get("refId").MustString("A")] = backend.DataResponse{
Error: err,
}
}
return splitResponse{er, http.Header{}}
}
// handleExpressions handles POST /api/ds/query when there is an expression.
func (s *ServiceImpl) handleExpressions(ctx context.Context, user identity.Requester, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) {
exprReq := expr.Request{
Queries: []expr.Query{},
}
if user != nil { // for passthrough authentication, SSE does not authenticate
exprReq.User = user
exprReq.OrgId = user.GetOrgID()
}
for _, pq := range parsedReq.getFlattenedQueries() {
if pq.datasource == nil {
return nil, ErrMissingDataSourceInfo.Build(errutil.TemplateData{
Public: map[string]any{
"RefId": pq.query.RefID,
},
})
}
exprReq.Queries = append(exprReq.Queries, expr.Query{
JSON: pq.query.JSON,
Interval: pq.query.Interval,
RefID: pq.query.RefID,
MaxDataPoints: pq.query.MaxDataPoints,
QueryType: pq.query.QueryType,
DataSource: pq.datasource,
TimeRange: expr.AbsoluteTimeRange{
From: pq.query.TimeRange.From,
To: pq.query.TimeRange.To,
},
})
}
qdr, err := s.expressionService.TransformData(ctx, time.Now(), &exprReq) // use time now because all queries have absolute time range
if err != nil {
return nil, fmt.Errorf("expression request error: %w", err)
}
return qdr, nil
}
// handleQuerySingleDatasource handles one or more queries to a single datasource
func (s *ServiceImpl) handleQuerySingleDatasource(ctx context.Context, user identity.Requester, parsedReq *parsedRequest) (*backend.QueryDataResponse, error) {
queries := parsedReq.getFlattenedQueries()
ds := queries[0].datasource
if err := s.pluginRequestValidator.Validate(ds.URL, nil); err != nil {
return nil, datasources.ErrDataSourceAccessDenied
}
// ensure that each query passed to this function has the same datasource
for _, pq := range queries {
if ds.UID != pq.datasource.UID {
return nil, fmt.Errorf("all queries must have the same datasource - found %s and %s", ds.UID, pq.datasource.UID)
}
}
pCtx, err := s.pCtxProvider.GetWithDataSource(ctx, ds.Type, user, ds)
if err != nil {
return nil, err
}
req := &backend.QueryDataRequest{
PluginContext: pCtx,
Headers: map[string]string{},
Queries: []backend.DataQuery{},
}
for _, q := range queries {
req.Queries = append(req.Queries, q.query)
}
return s.pluginClient.QueryData(ctx, req)
}
// parseRequest parses a request into parsed queries grouped by datasource uid
func (s *ServiceImpl) parseMetricRequest(ctx context.Context, user identity.Requester, skipDSCache bool, reqDTO dtos.MetricRequest) (*parsedRequest, error) {
if len(reqDTO.Queries) == 0 {
return nil, ErrNoQueriesFound
}
timeRange := legacydata.NewDataTimeRange(reqDTO.From, reqDTO.To)
req := &parsedRequest{
hasExpression: false,
parsedQueries: make(map[string][]parsedQuery),
dsTypes: make(map[string]bool),
}
// Parse the queries and store them by datasource
datasourcesByUid := map[string]*datasources.DataSource{}
for _, query := range reqDTO.Queries {
ds, err := s.getDataSourceFromQuery(ctx, user, skipDSCache, query, datasourcesByUid)
if err != nil {
return nil, err
}
if ds == nil {
return nil, ErrInvalidDatasourceID
}
datasourcesByUid[ds.UID] = ds
if expr.NodeTypeFromDatasourceUID(ds.UID) != expr.TypeDatasourceNode {
req.hasExpression = true
} else {
req.dsTypes[ds.Type] = true
}
if _, ok := req.parsedQueries[ds.UID]; !ok {
req.parsedQueries[ds.UID] = []parsedQuery{}
}
s.log.Debug("Processing metrics query", "query", query)
modelJSON, err := query.MarshalJSON()
if err != nil {
return nil, err
}
req.parsedQueries[ds.UID] = append(req.parsedQueries[ds.UID], parsedQuery{
datasource: ds,
query: backend.DataQuery{
TimeRange: backend.TimeRange{
From: timeRange.GetFromAsTimeUTC(),
To: timeRange.GetToAsTimeUTC(),
},
RefID: query.Get("refId").MustString("A"),
MaxDataPoints: query.Get("maxDataPoints").MustInt64(100),
Interval: time.Duration(query.Get("intervalMs").MustInt64(1000)) * time.Millisecond,
QueryType: query.Get("queryType").MustString(""),
JSON: modelJSON,
},
rawQuery: query,
})
}
return req, req.validateRequest(ctx)
}
func (s *ServiceImpl) getDataSourceFromQuery(ctx context.Context, user identity.Requester, skipDSCache bool, query *simplejson.Json, history map[string]*datasources.DataSource) (*datasources.DataSource, error) {
var err error
uid := query.Get("datasource").Get("uid").MustString()
// before 8.3 special types could be sent as datasource (expr)
if uid == "" {
uid = query.Get("datasource").MustString()
}
// check cache value
ds, ok := history[uid]
if ok {
return ds, nil
}
if kind := expr.NodeTypeFromDatasourceUID(uid); kind != expr.TypeDatasourceNode {
return expr.DataSourceModelFromNodeType(kind)
}
if uid == grafanads.DatasourceUID {
return grafanads.DataSourceModel(user.GetOrgID()), nil
}
if uid != "" {
ds, err = s.dataSourceCache.GetDatasourceByUID(ctx, uid, user, skipDSCache)
if err != nil {
return nil, err
}
return ds, nil
}
// use datasourceId if it exists
id := query.Get("datasourceId").MustInt64(0)
if id > 0 {
ds, err = s.dataSourceCache.GetDatasource(ctx, id, user, skipDSCache)
if err != nil {
return nil, err
}
return ds, nil
}
return nil, ErrInvalidDatasourceID
}