grafana/pkg/tsdb/loki/loki.go
Sven Grossmann ec857e1de9
Loki: Add dashboard and panel names as headers (#92096)
* feat(nameHeaders): add feature flag

* add safe parsing of headers

* use headers in loki datasource

* Loki: add option to pass headers to Loki

* Loki: add datasource tests for dashboard names

* cleanup

* DataSourceWithBackend: add test

* rename to `sanitizeHeader`

* Loki: add condition when to add headers

* Loki: add e2e tests

* Loki: change test name
2024-08-22 21:30:43 +02:00

302 lines
9.5 KiB
Go

package loki
import (
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"sync"
"time"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/featuremgmt"
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
)
type Service struct {
im instancemgmt.InstanceManager
tracer tracing.Tracer
logger log.Logger
}
var (
_ backend.QueryDataHandler = (*Service)(nil)
_ backend.StreamHandler = (*Service)(nil)
_ backend.CallResourceHandler = (*Service)(nil)
)
func ProvideService(httpClientProvider *httpclient.Provider, tracer tracing.Tracer) *Service {
return &Service{
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
tracer: tracer,
logger: backend.NewLoggerWith("logger", "tsdb.loki"),
}
}
var (
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
stagePrepareRequest = "prepareRequest"
stageDatabaseRequest = "databaseRequest"
stageParseResponse = "parseResponse"
dashboardTitleHeader = "X-Dashboard-Title"
panelTitleHeader = "X-Panel-Title"
)
type datasourceInfo struct {
HTTPClient *http.Client
URL string
// open streams
streams map[string]data.FrameJSONCache
streamsMu sync.RWMutex
}
type QueryJSONModel struct {
dataquery.LokiDataQuery
Direction *string `json:"direction,omitempty"`
SupportingQueryType *string `json:"supportingQueryType"`
}
type ResponseOpts struct {
metricDataplane bool
logsDataplane bool
}
func parseQueryModel(raw json.RawMessage) (*QueryJSONModel, error) {
model := &QueryJSONModel{}
err := json.Unmarshal(raw, model)
return model, err
}
func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.InstanceFactoryFunc {
return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
opts, err := settings.HTTPClientOptions(ctx)
if err != nil {
return nil, err
}
opts.ForwardHTTPHeaders = true
client, err := httpClientProvider.New(opts)
if err != nil {
return nil, err
}
model := &datasourceInfo{
HTTPClient: client,
URL: settings.URL,
streams: make(map[string]data.FrameJSONCache),
}
return model, nil
}
}
func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
logger := s.logger.FromContext(ctx)
if err != nil {
logger.Error("Failed to get data source info", "error", err)
return err
}
return callResource(ctx, req, sender, dsInfo, logger, s.tracer)
}
func callResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender, dsInfo *datasourceInfo, plog log.Logger, tracer tracing.Tracer) error {
url := req.URL
lokiURL := fmt.Sprintf("/loki/api/v1/%s", url)
ctx, span := tracer.Start(ctx, "datasource.loki.CallResource", trace.WithAttributes(
attribute.String("url", lokiURL),
))
defer span.End()
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer, false)
rawLokiResponse, err := api.RawQuery(ctx, lokiURL)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
plog.Error("Failed resource call from loki", "err", err, "url", lokiURL)
return err
}
respHeaders := map[string][]string{
"content-type": {"application/json"},
}
if rawLokiResponse.Encoding != "" {
respHeaders["content-encoding"] = []string{rawLokiResponse.Encoding}
}
return sender.Send(&backend.CallResourceResponse{
Status: rawLokiResponse.Status,
Headers: respHeaders,
Body: rawLokiResponse.Body,
})
}
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
logger := s.logger.FromContext(ctx).With("fromAlert", fromAlert)
if err != nil {
logger.Error("Failed to get data source info", "err", err)
result := backend.NewQueryDataResponse()
return result, err
}
responseOpts := ResponseOpts{
metricDataplane: isFeatureEnabled(ctx, featuremgmt.FlagLokiMetricDataplane),
logsDataplane: isFeatureEnabled(ctx, featuremgmt.FlagLokiLogsDataplane),
}
if isFeatureEnabled(ctx, featuremgmt.FlagLokiSendDashboardPanelNames) {
s.applyHeaders(ctx, req)
}
return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger, isFeatureEnabled(ctx, featuremgmt.FlagLokiRunQueriesInParallel), isFeatureEnabled(ctx, featuremgmt.FlagLokiStructuredMetadata))
}
func (s *Service) applyHeaders(ctx context.Context, req backend.ForwardHTTPHeaders) {
reqCtx := contexthandler.FromContext(ctx)
if req == nil || reqCtx == nil || reqCtx.Req == nil {
return
}
var hList = []string{dashboardTitleHeader, panelTitleHeader}
for _, hName := range hList {
hVal := reqCtx.Req.Header.Get(hName)
if hVal == "" {
continue
}
req.SetHTTPHeader(hName, hVal)
}
}
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger, runInParallel bool, requestStructuredMetadata bool) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse()
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer, requestStructuredMetadata)
start := time.Now()
queries, err := parseQuery(req)
if err != nil {
plog.Error("Failed to prepare request to Loki", "error", err, "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest)
return result, err
}
plog.Info("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest, "runInParallel", runInParallel)
ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries", trace.WithAttributes(
attribute.Bool("runInParallel", runInParallel),
attribute.Int("queriesLength", len(queries)),
))
if req.GetHTTPHeader("X-Query-Group-Id") != "" {
span.SetAttributes(attribute.String("query_group_id", req.GetHTTPHeader("X-Query-Group-Id")))
}
defer span.End()
start = time.Now()
// We are testing running of queries in parallel behind feature flag
if runInParallel {
resultLock := sync.Mutex{}
err = concurrency.ForEachJob(ctx, len(queries), 10, func(ctx context.Context, idx int) error {
query := queries[idx]
queryRes := executeQuery(ctx, query, req, runInParallel, api, responseOpts, tracer, plog)
resultLock.Lock()
defer resultLock.Unlock()
result.Responses[query.RefID] = queryRes
return nil // errors are saved per-query,always return nil
})
} else {
for _, query := range queries {
queryRes := executeQuery(ctx, query, req, runInParallel, api, responseOpts, tracer, plog)
result.Responses[query.RefID] = queryRes
}
}
plog.Debug("Executed queries", "duration", time.Since(start), "queriesLength", len(queries), "runInParallel", runInParallel)
return result, err
}
func executeQuery(ctx context.Context, query *lokiQuery, req *backend.QueryDataRequest, runInParallel bool, api *LokiAPI, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger) backend.DataResponse {
ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries.runQuery", trace.WithAttributes(
attribute.Bool("runInParallel", runInParallel),
attribute.String("expr", query.Expr),
attribute.Int64("start_unixnano", query.Start.UnixNano()),
attribute.Int64("stop_unixnano", query.End.UnixNano()),
))
if req.GetHTTPHeader("X-Query-Group-Id") != "" {
span.SetAttributes(attribute.String("query_group_id", req.GetHTTPHeader("X-Query-Group-Id")))
}
defer span.End()
queryRes, err := runQuery(ctx, api, query, responseOpts, plog)
if queryRes == nil {
// we always want to return a backend.DataResponse object, even if we received just an error
queryRes = &backend.DataResponse{}
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
queryRes.Error = err
}
return *queryRes
}
// we extracted this part of the functionality to make it easy to unit-test it
func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts, plog log.Logger) (*backend.DataResponse, error) {
res, err := api.DataQuery(ctx, *query, responseOpts)
if err != nil {
plog.Error("Error querying loki", "error", err)
return res, err
}
for _, frame := range res.Frames {
err = adjustFrame(frame, query, !responseOpts.metricDataplane, responseOpts.logsDataplane)
if err != nil {
plog.Error("Error adjusting frame", "error", err)
return res, err
}
}
return res, nil
}
func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*datasourceInfo, error) {
i, err := s.im.Get(ctx, pluginCtx)
if err != nil {
return nil, err
}
instance, ok := i.(*datasourceInfo)
if !ok {
return nil, fmt.Errorf("failed to cast data source info")
}
return instance, nil
}
func isFeatureEnabled(ctx context.Context, feature string) bool {
return backend.GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled(feature)
}