grafana/pkg/tsdb/influxdb/influxql/influxql.go
ismail simsek 2cce9aa2f7
Chore: Move tracing function in influxdb package (#83899)
move tracing function in influxdb package
2024-03-05 07:34:08 -06:00

206 lines
5.9 KiB
Go

package influxql
import (
"context"
"errors"
"fmt"
"net/http"
"net/url"
"path"
"strings"
"sync"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/buffered"
"github.com/grafana/grafana/pkg/tsdb/influxdb/influxql/querydata"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
const defaultRetentionPolicy = "default"
var (
ErrInvalidHttpMode = errors.New("'httpMode' should be either 'GET' or 'POST'")
glog = log.New("tsdb.influx_influxql")
)
func Query(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceInfo, req *backend.QueryDataRequest, features featuremgmt.FeatureToggles) (*backend.QueryDataResponse, error) {
logger := glog.FromContext(ctx)
response := backend.NewQueryDataResponse()
var err error
// We are testing running of queries in parallel behind feature flag
if features.IsEnabled(ctx, featuremgmt.FlagInfluxdbRunQueriesInParallel) {
concurrentQueryCount, err := req.PluginContext.GrafanaConfig.ConcurrentQueryCount()
if err != nil {
logger.Debug(fmt.Sprintf("Concurrent Query Count read/parse error: %v", err), featuremgmt.FlagInfluxdbRunQueriesInParallel)
concurrentQueryCount = 10
}
responseLock := sync.Mutex{}
err = concurrency.ForEachJob(ctx, len(req.Queries), concurrentQueryCount, func(ctx context.Context, idx int) error {
reqQuery := req.Queries[idx]
query, err := models.QueryParse(reqQuery)
if err != nil {
return err
}
rawQuery, err := query.Build(req)
if err != nil {
return err
}
query.RefID = reqQuery.RefID
query.RawQuery = rawQuery
if setting.Env == setting.Dev {
logger.Debug("Influxdb query", "raw query", rawQuery)
}
request, err := createRequest(ctx, logger, dsInfo, rawQuery, query.Policy)
if err != nil {
return err
}
resp, err := execute(ctx, tracer, dsInfo, logger, query, request, features.IsEnabled(ctx, featuremgmt.FlagInfluxqlStreamingParser))
responseLock.Lock()
defer responseLock.Unlock()
if err != nil {
response.Responses[query.RefID] = backend.DataResponse{Error: err}
} else {
response.Responses[query.RefID] = resp
}
return nil // errors are saved per-query,always return nil
})
if err != nil {
logger.Debug("Influxdb concurrent query error", "concurrent query", err)
}
} else {
for _, reqQuery := range req.Queries {
query, err := models.QueryParse(reqQuery)
if err != nil {
return &backend.QueryDataResponse{}, err
}
rawQuery, err := query.Build(req)
if err != nil {
return &backend.QueryDataResponse{}, err
}
query.RefID = reqQuery.RefID
query.RawQuery = rawQuery
if setting.Env == setting.Dev {
logger.Debug("Influxdb query", "raw query", rawQuery)
}
request, err := createRequest(ctx, logger, dsInfo, rawQuery, query.Policy)
if err != nil {
return &backend.QueryDataResponse{}, err
}
resp, err := execute(ctx, tracer, dsInfo, logger, query, request, features.IsEnabled(ctx, featuremgmt.FlagInfluxqlStreamingParser))
if err != nil {
response.Responses[query.RefID] = backend.DataResponse{Error: err}
} else {
response.Responses[query.RefID] = resp
}
}
}
return response, err
}
func createRequest(ctx context.Context, logger log.Logger, dsInfo *models.DatasourceInfo, queryStr string, retentionPolicy string) (*http.Request, error) {
u, err := url.Parse(dsInfo.URL)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, "query")
httpMode := dsInfo.HTTPMode
var req *http.Request
switch httpMode {
case "GET":
req, err = http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
if err != nil {
return nil, err
}
case "POST":
bodyValues := url.Values{}
bodyValues.Add("q", queryStr)
body := bodyValues.Encode()
req, err = http.NewRequestWithContext(ctx, http.MethodPost, u.String(), strings.NewReader(body))
if err != nil {
return nil, err
}
default:
return nil, ErrInvalidHttpMode
}
params := req.URL.Query()
params.Set("db", dsInfo.DbName)
params.Set("epoch", "ms")
// default is hardcoded default retention policy
// InfluxDB will use the default policy when it is not added to the request
if retentionPolicy != "" && retentionPolicy != "default" {
params.Set("rp", retentionPolicy)
}
if httpMode == "GET" {
params.Set("q", queryStr)
} else if httpMode == "POST" {
req.Header.Set("Content-type", "application/x-www-form-urlencoded")
}
req.URL.RawQuery = params.Encode()
logger.Debug("Influxdb request", "url", req.URL.String())
return req, nil
}
func execute(ctx context.Context, tracer trace.Tracer, dsInfo *models.DatasourceInfo, logger log.Logger, query *models.Query, request *http.Request, isStreamingParserEnabled bool) (backend.DataResponse, error) {
res, err := dsInfo.HTTPClient.Do(request)
if err != nil {
return backend.DataResponse{}, err
}
defer func() {
if err := res.Body.Close(); err != nil {
logger.Warn("Failed to close response body", "err", err)
}
}()
_, endSpan := startTrace(ctx, tracer, "datasource.influxdb.influxql.parseResponse")
defer endSpan()
var resp *backend.DataResponse
if isStreamingParserEnabled {
logger.Info("InfluxDB InfluxQL streaming parser enabled: ", "info")
resp = querydata.ResponseParse(res.Body, res.StatusCode, query)
} else {
resp = buffered.ResponseParse(res.Body, res.StatusCode, query)
}
return *resp, nil
}
// startTrace setups a trace but does not panic if tracer is nil which helps with testing
func startTrace(ctx context.Context, tracer trace.Tracer, name string, attributes ...attribute.KeyValue) (context.Context, func()) {
if tracer == nil {
return ctx, func() {}
}
ctx, span := tracer.Start(ctx, name, trace.WithAttributes(attributes...))
return ctx, func() {
span.End()
}
}