mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Loki: Improve backend instrumentation of QueryData
calls (#73807)
* improve loki `queryData` instrumentation * fix lint * use correct logger in "queryData" * capitalize log messages * distinguish between requests from alerting
This commit is contained in:
parent
f4c127a1d8
commit
124f445db0
@ -10,9 +10,12 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
jsoniter "github.com/json-iterator/go"
|
jsoniter "github.com/json-iterator/go"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
@ -23,6 +26,7 @@ type LokiAPI struct {
|
|||||||
client *http.Client
|
client *http.Client
|
||||||
url string
|
url string
|
||||||
log log.Logger
|
log log.Logger
|
||||||
|
tracer tracing.Tracer
|
||||||
}
|
}
|
||||||
|
|
||||||
type RawLokiResponse struct {
|
type RawLokiResponse struct {
|
||||||
@ -31,8 +35,8 @@ type RawLokiResponse struct {
|
|||||||
Encoding string
|
Encoding string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLokiAPI(client *http.Client, url string, log log.Logger) *LokiAPI {
|
func newLokiAPI(client *http.Client, url string, log log.Logger, tracer tracing.Tracer) *LokiAPI {
|
||||||
return &LokiAPI{client: client, url: url, log: log}
|
return &LokiAPI{client: client, url: url, log: log, tracer: tracer}
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*http.Request, error) {
|
func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*http.Request, error) {
|
||||||
@ -154,11 +158,17 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
api.log.Debug("Sending query to loki", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr, "queryType", query.QueryType, "direction", query.Direction, "maxLines", query.MaxLines, "supportingQueryType", query.SupportingQueryType, "lokiHost", req.URL.Host, "lokiPath", req.URL.Path)
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
resp, err := api.client.Do(req)
|
resp, err := api.client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
took := time.Since(start)
|
||||||
|
api.log.Debug("Response received from loki", "took", took, "status", resp.StatusCode, "length", resp.Header.Get("Content-Length"))
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := resp.Body.Close(); err != nil {
|
if err := resp.Body.Close(); err != nil {
|
||||||
api.log.Warn("Failed to close response body", "err", err)
|
api.log.Warn("Failed to close response body", "err", err)
|
||||||
@ -166,16 +176,29 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
if resp.StatusCode/100 != 2 {
|
if resp.StatusCode/100 != 2 {
|
||||||
return nil, readLokiError(resp.Body)
|
err := readLokiError(resp.Body)
|
||||||
|
api.log.Error("Error received from loki", "err", err, "status", resp.StatusCode)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
|
_, span := api.tracer.Start(ctx, "datasource.loki.parseResponse")
|
||||||
|
span.SetAttributes("metricDataplane", responseOpts.metricDataplane, attribute.Key("metricDataplane").Bool(responseOpts.metricDataplane))
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
iter := jsoniter.Parse(jsoniter.ConfigDefault, resp.Body, 1024)
|
iter := jsoniter.Parse(jsoniter.ConfigDefault, resp.Body, 1024)
|
||||||
res := converter.ReadPrometheusStyleResult(iter, converter.Options{Dataplane: responseOpts.metricDataplane})
|
res := converter.ReadPrometheusStyleResult(iter, converter.Options{Dataplane: responseOpts.metricDataplane})
|
||||||
|
took = time.Since(start)
|
||||||
|
|
||||||
if res.Error != nil {
|
if res.Error != nil {
|
||||||
|
span.RecordError(res.Error)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
|
logger.Error("Error parsing response from loki", "err", res.Error, "metricDataplane", responseOpts.metricDataplane, "took", took)
|
||||||
return nil, res.Error
|
return nil, res.Error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Debug("Response parsed from loki", "took", took, "metricDataplane", responseOpts.metricDataplane, "framesLength", len(res.Frames))
|
||||||
|
|
||||||
return res.Frames, nil
|
return res.Frames, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
)
|
)
|
||||||
|
|
||||||
type mockRequestCallback func(req *http.Request)
|
type mockRequestCallback func(req *http.Request)
|
||||||
@ -64,7 +65,7 @@ func makeMockedAPIWithUrl(url string, statusCode int, contentType string, respon
|
|||||||
Transport: &mockedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback},
|
Transport: &mockedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback},
|
||||||
}
|
}
|
||||||
|
|
||||||
return newLokiAPI(&client, url, log.New("test"))
|
return newLokiAPI(&client, url, log.New("test"), tracing.NewFakeTracer())
|
||||||
}
|
}
|
||||||
|
|
||||||
func makeCompressedMockedAPIWithUrl(url string, statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI {
|
func makeCompressedMockedAPIWithUrl(url string, statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI {
|
||||||
@ -72,5 +73,5 @@ func makeCompressedMockedAPIWithUrl(url string, statusCode int, contentType stri
|
|||||||
Transport: &mockedCompressedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback},
|
Transport: &mockedCompressedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback},
|
||||||
}
|
}
|
||||||
|
|
||||||
return newLokiAPI(&client, url, log.New("test"))
|
return newLokiAPI(&client, url, log.New("test"), tracing.NewFakeTracer())
|
||||||
}
|
}
|
||||||
|
@ -35,14 +35,14 @@ func adjustMetricFrame(frame *data.Frame, query *lokiQuery, setFrameName bool) e
|
|||||||
fields := frame.Fields
|
fields := frame.Fields
|
||||||
// we check if the fields are of correct type
|
// we check if the fields are of correct type
|
||||||
if len(fields) != 2 {
|
if len(fields) != 2 {
|
||||||
return fmt.Errorf("invalid fields in metric frame")
|
return fmt.Errorf("invalid field length in metric frame. expected 2, got %d", len(fields))
|
||||||
}
|
}
|
||||||
|
|
||||||
timeField := fields[0]
|
timeField := fields[0]
|
||||||
valueField := fields[1]
|
valueField := fields[1]
|
||||||
|
|
||||||
if (timeField.Type() != data.FieldTypeTime) || (valueField.Type() != data.FieldTypeFloat64) {
|
if (timeField.Type() != data.FieldTypeTime) || (valueField.Type() != data.FieldTypeFloat64) {
|
||||||
return fmt.Errorf("invalid fields in metric frame")
|
return fmt.Errorf("invalid field types in metric frame. expected time and float64, got %s and %s", timeField.Type(), valueField.Type())
|
||||||
}
|
}
|
||||||
|
|
||||||
labels := getFrameLabels(frame)
|
labels := getFrameLabels(frame)
|
||||||
@ -94,7 +94,7 @@ func adjustLegacyLogsFrame(frame *data.Frame, query *lokiQuery) error {
|
|||||||
// we check if the fields are of correct type and length
|
// we check if the fields are of correct type and length
|
||||||
fields := frame.Fields
|
fields := frame.Fields
|
||||||
if len(fields) != 4 {
|
if len(fields) != 4 {
|
||||||
return fmt.Errorf("invalid fields in logs frame")
|
return fmt.Errorf("invalid field length in logs frame. expected 4, got %d", len(fields))
|
||||||
}
|
}
|
||||||
|
|
||||||
labelsField := fields[0]
|
labelsField := fields[0]
|
||||||
@ -103,11 +103,11 @@ func adjustLegacyLogsFrame(frame *data.Frame, query *lokiQuery) error {
|
|||||||
stringTimeField := fields[3]
|
stringTimeField := fields[3]
|
||||||
|
|
||||||
if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) {
|
if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) {
|
||||||
return fmt.Errorf("invalid fields in logs frame")
|
return fmt.Errorf("invalid field types in logs frame. expected time, string, json and string, got %s, %s, %s and %s", timeField.Type(), lineField.Type(), labelsField.Type(), stringTimeField.Type())
|
||||||
}
|
}
|
||||||
|
|
||||||
if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) || (timeField.Len() != stringTimeField.Len()) {
|
if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) || (timeField.Len() != stringTimeField.Len()) {
|
||||||
return fmt.Errorf("invalid fields in logs frame")
|
return fmt.Errorf("indifferent field lengths in logs frame. expected all to be equal, got %d, %d, %d and %d", timeField.Len(), lineField.Len(), labelsField.Len(), stringTimeField.Len())
|
||||||
}
|
}
|
||||||
|
|
||||||
// this returns an error when the length of fields do not match
|
// this returns an error when the length of fields do not match
|
||||||
@ -150,7 +150,7 @@ func adjustDataplaneLogsFrame(frame *data.Frame, query *lokiQuery) error {
|
|||||||
// we check if the fields are of correct type and length
|
// we check if the fields are of correct type and length
|
||||||
fields := frame.Fields
|
fields := frame.Fields
|
||||||
if len(fields) != 4 {
|
if len(fields) != 4 {
|
||||||
return fmt.Errorf("invalid fields in logs frame")
|
return fmt.Errorf("invalid field length in logs frame. expected 4, got %d", len(fields))
|
||||||
}
|
}
|
||||||
|
|
||||||
labelsField := fields[0]
|
labelsField := fields[0]
|
||||||
@ -159,11 +159,11 @@ func adjustDataplaneLogsFrame(frame *data.Frame, query *lokiQuery) error {
|
|||||||
stringTimeField := fields[3]
|
stringTimeField := fields[3]
|
||||||
|
|
||||||
if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) {
|
if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) {
|
||||||
return fmt.Errorf("invalid fields in logs frame")
|
return fmt.Errorf("invalid field types in logs frame. expected time, string, json and string, got %s, %s, %s and %s", timeField.Type(), lineField.Type(), labelsField.Type(), stringTimeField.Type())
|
||||||
}
|
}
|
||||||
|
|
||||||
if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) || (timeField.Len() != stringTimeField.Len()) {
|
if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) || (timeField.Len() != stringTimeField.Len()) {
|
||||||
return fmt.Errorf("invalid fields in logs frame")
|
return fmt.Errorf("indifferent field lengths in logs frame. expected all to be equal, got %d, %d, %d and %d", timeField.Len(), lineField.Len(), labelsField.Len(), stringTimeField.Len())
|
||||||
}
|
}
|
||||||
|
|
||||||
// this returns an error when the length of fields do not match
|
// this returns an error when the length of fields do not match
|
||||||
|
@ -14,11 +14,13 @@ import (
|
|||||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||||
"go.opentelemetry.io/otel/attribute"
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
|
||||||
"github.com/grafana/grafana/pkg/infra/httpclient"
|
"github.com/grafana/grafana/pkg/infra/httpclient"
|
||||||
"github.com/grafana/grafana/pkg/infra/log"
|
"github.com/grafana/grafana/pkg/infra/log"
|
||||||
"github.com/grafana/grafana/pkg/infra/tracing"
|
"github.com/grafana/grafana/pkg/infra/tracing"
|
||||||
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
"github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||||
|
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
|
||||||
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
|
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -122,7 +124,7 @@ func callResource(ctx context.Context, req *backend.CallResourceRequest, sender
|
|||||||
span.SetAttributes("url", lokiURL, attribute.Key("url").String(lokiURL))
|
span.SetAttributes("url", lokiURL, attribute.Key("url").String(lokiURL))
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog)
|
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer)
|
||||||
rawLokiResponse, err := api.RawQuery(ctx, lokiURL)
|
rawLokiResponse, err := api.RawQuery(ctx, lokiURL)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -144,7 +146,10 @@ func callResource(ctx context.Context, req *backend.CallResourceRequest, sender
|
|||||||
|
|
||||||
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
|
||||||
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
|
||||||
|
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
|
||||||
|
logger := logger.FromContext(ctx).New("api", "QueryData", "fromAlert", fromAlert)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Error("Failed to get data source info", "err", err)
|
||||||
result := backend.NewQueryDataResponse()
|
result := backend.NewQueryDataResponse()
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
@ -154,21 +159,22 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
|||||||
logsDataplane: s.features.IsEnabled(featuremgmt.FlagLokiLogsDataplane),
|
logsDataplane: s.features.IsEnabled(featuremgmt.FlagLokiLogsDataplane),
|
||||||
}
|
}
|
||||||
|
|
||||||
return queryData(ctx, req, dsInfo, responseOpts, s.tracer)
|
return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer) (*backend.QueryDataResponse, error) {
|
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger) (*backend.QueryDataResponse, error) {
|
||||||
result := backend.NewQueryDataResponse()
|
result := backend.NewQueryDataResponse()
|
||||||
|
|
||||||
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, logger.FromContext(ctx))
|
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer)
|
||||||
|
|
||||||
queries, err := parseQuery(req)
|
queries, err := parseQuery(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
plog.Error("Failed to parse queries", "err", err)
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, query := range queries {
|
for _, query := range queries {
|
||||||
ctx, span := tracer.Start(ctx, "datasource.loki")
|
ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQuery")
|
||||||
span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr))
|
span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr))
|
||||||
span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano()))
|
span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano()))
|
||||||
span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano()))
|
span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano()))
|
||||||
@ -177,21 +183,20 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datas
|
|||||||
span.SetAttributes("query_group_id", req.GetHTTPHeader("X-Query-Group-Id"), attribute.Key("query_group_id").String(req.GetHTTPHeader("X-Query-Group-Id")))
|
span.SetAttributes("query_group_id", req.GetHTTPHeader("X-Query-Group-Id"), attribute.Key("query_group_id").String(req.GetHTTPHeader("X-Query-Group-Id")))
|
||||||
}
|
}
|
||||||
|
|
||||||
logger := logger.FromContext(ctx) // get logger with trace-id and other contextual info
|
|
||||||
logger.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
|
|
||||||
|
|
||||||
frames, err := runQuery(ctx, api, query, responseOpts)
|
frames, err := runQuery(ctx, api, query, responseOpts)
|
||||||
|
|
||||||
span.End()
|
|
||||||
queryRes := backend.DataResponse{}
|
queryRes := backend.DataResponse{}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.RecordError(err)
|
||||||
|
span.SetStatus(codes.Error, err.Error())
|
||||||
queryRes.Error = err
|
queryRes.Error = err
|
||||||
} else {
|
} else {
|
||||||
queryRes.Frames = frames
|
queryRes.Frames = frames
|
||||||
}
|
}
|
||||||
|
|
||||||
result.Responses[query.RefID] = queryRes
|
result.Responses[query.RefID] = queryRes
|
||||||
|
span.End()
|
||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -200,14 +205,15 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datas
|
|||||||
func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts) (data.Frames, error) {
|
func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts) (data.Frames, error) {
|
||||||
frames, err := api.DataQuery(ctx, *query, responseOpts)
|
frames, err := api.DataQuery(ctx, *query, responseOpts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Error("Error querying loki", "err", err)
|
||||||
return data.Frames{}, err
|
return data.Frames{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, frame := range frames {
|
for _, frame := range frames {
|
||||||
if err = adjustFrame(frame, query, !responseOpts.metricDataplane, responseOpts.logsDataplane); err != nil {
|
err = adjustFrame(frame, query, !responseOpts.metricDataplane, responseOpts.logsDataplane)
|
||||||
return data.Frames{}, err
|
|
||||||
}
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
logger.Error("Error adjusting frame", "err", err)
|
||||||
return data.Frames{}, err
|
return data.Frames{}, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -223,7 +229,7 @@ func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext
|
|||||||
|
|
||||||
instance, ok := i.(*datasourceInfo)
|
instance, ok := i.(*datasourceInfo)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("failed to cast datsource info")
|
return nil, fmt.Errorf("failed to cast data source info")
|
||||||
}
|
}
|
||||||
|
|
||||||
return instance, nil
|
return instance, nil
|
||||||
|
Loading…
Reference in New Issue
Block a user