diff --git a/pkg/tsdb/loki/api.go b/pkg/tsdb/loki/api.go index 6d04e66bbb8..5ec928a4fdf 100644 --- a/pkg/tsdb/loki/api.go +++ b/pkg/tsdb/loki/api.go @@ -10,9 +10,12 @@ import ( "net/url" "path" "strconv" + "time" "github.com/grafana/grafana-plugin-sdk-go/data" jsoniter "github.com/json-iterator/go" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" @@ -23,6 +26,7 @@ type LokiAPI struct { client *http.Client url string log log.Logger + tracer tracing.Tracer } type RawLokiResponse struct { @@ -31,8 +35,8 @@ type RawLokiResponse struct { Encoding string } -func newLokiAPI(client *http.Client, url string, log log.Logger) *LokiAPI { - return &LokiAPI{client: client, url: url, log: log} +func newLokiAPI(client *http.Client, url string, log log.Logger, tracer tracing.Tracer) *LokiAPI { + return &LokiAPI{client: client, url: url, log: log, tracer: tracer} } func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*http.Request, error) { @@ -154,11 +158,17 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts return nil, err } + api.log.Debug("Sending query to loki", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr, "queryType", query.QueryType, "direction", query.Direction, "maxLines", query.MaxLines, "supportingQueryType", query.SupportingQueryType, "lokiHost", req.URL.Host, "lokiPath", req.URL.Path) + start := time.Now() + resp, err := api.client.Do(req) if err != nil { return nil, err } + took := time.Since(start) + api.log.Debug("Response received from loki", "took", took, "status", resp.StatusCode, "length", resp.Header.Get("Content-Length")) + defer func() { if err := resp.Body.Close(); err != nil { api.log.Warn("Failed to close response body", "err", err) @@ -166,16 +176,29 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery, responseOpts }() if resp.StatusCode/100 != 2 { - return nil, readLokiError(resp.Body) + err := readLokiError(resp.Body) + api.log.Error("Error received from loki", "err", err, "status", resp.StatusCode) + return nil, err } + start = time.Now() + _, span := api.tracer.Start(ctx, "datasource.loki.parseResponse") + span.SetAttributes("metricDataplane", responseOpts.metricDataplane, attribute.Key("metricDataplane").Bool(responseOpts.metricDataplane)) + defer span.End() + iter := jsoniter.Parse(jsoniter.ConfigDefault, resp.Body, 1024) res := converter.ReadPrometheusStyleResult(iter, converter.Options{Dataplane: responseOpts.metricDataplane}) + took = time.Since(start) if res.Error != nil { + span.RecordError(res.Error) + span.SetStatus(codes.Error, err.Error()) + logger.Error("Error parsing response from loki", "err", res.Error, "metricDataplane", responseOpts.metricDataplane, "took", took) return nil, res.Error } + logger.Debug("Response parsed from loki", "took", took, "metricDataplane", responseOpts.metricDataplane, "framesLength", len(res.Frames)) + return res.Frames, nil } diff --git a/pkg/tsdb/loki/api_mock.go b/pkg/tsdb/loki/api_mock.go index 0548f798c61..d3acb40bf16 100644 --- a/pkg/tsdb/loki/api_mock.go +++ b/pkg/tsdb/loki/api_mock.go @@ -6,6 +6,7 @@ import ( "net/http" "github.com/grafana/grafana/pkg/infra/log" + "github.com/grafana/grafana/pkg/infra/tracing" ) type mockRequestCallback func(req *http.Request) @@ -64,7 +65,7 @@ func makeMockedAPIWithUrl(url string, statusCode int, contentType string, respon Transport: &mockedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback}, } - return newLokiAPI(&client, url, log.New("test")) + return newLokiAPI(&client, url, log.New("test"), tracing.NewFakeTracer()) } func makeCompressedMockedAPIWithUrl(url string, statusCode int, contentType string, responseBytes []byte, requestCallback mockRequestCallback) *LokiAPI { @@ -72,5 +73,5 @@ func makeCompressedMockedAPIWithUrl(url string, statusCode int, contentType stri Transport: &mockedCompressedRoundTripper{statusCode: statusCode, contentType: contentType, responseBytes: responseBytes, requestCallback: requestCallback}, } - return newLokiAPI(&client, url, log.New("test")) + return newLokiAPI(&client, url, log.New("test"), tracing.NewFakeTracer()) } diff --git a/pkg/tsdb/loki/frame.go b/pkg/tsdb/loki/frame.go index aa9cc3f3209..aa40633291a 100644 --- a/pkg/tsdb/loki/frame.go +++ b/pkg/tsdb/loki/frame.go @@ -35,14 +35,14 @@ func adjustMetricFrame(frame *data.Frame, query *lokiQuery, setFrameName bool) e fields := frame.Fields // we check if the fields are of correct type if len(fields) != 2 { - return fmt.Errorf("invalid fields in metric frame") + return fmt.Errorf("invalid field length in metric frame. expected 2, got %d", len(fields)) } timeField := fields[0] valueField := fields[1] if (timeField.Type() != data.FieldTypeTime) || (valueField.Type() != data.FieldTypeFloat64) { - return fmt.Errorf("invalid fields in metric frame") + return fmt.Errorf("invalid field types in metric frame. expected time and float64, got %s and %s", timeField.Type(), valueField.Type()) } labels := getFrameLabels(frame) @@ -94,7 +94,7 @@ func adjustLegacyLogsFrame(frame *data.Frame, query *lokiQuery) error { // we check if the fields are of correct type and length fields := frame.Fields if len(fields) != 4 { - return fmt.Errorf("invalid fields in logs frame") + return fmt.Errorf("invalid field length in logs frame. expected 4, got %d", len(fields)) } labelsField := fields[0] @@ -103,11 +103,11 @@ func adjustLegacyLogsFrame(frame *data.Frame, query *lokiQuery) error { stringTimeField := fields[3] if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) { - return fmt.Errorf("invalid fields in logs frame") + return fmt.Errorf("invalid field types in logs frame. expected time, string, json and string, got %s, %s, %s and %s", timeField.Type(), lineField.Type(), labelsField.Type(), stringTimeField.Type()) } if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) || (timeField.Len() != stringTimeField.Len()) { - return fmt.Errorf("invalid fields in logs frame") + return fmt.Errorf("indifferent field lengths in logs frame. expected all to be equal, got %d, %d, %d and %d", timeField.Len(), lineField.Len(), labelsField.Len(), stringTimeField.Len()) } // this returns an error when the length of fields do not match @@ -150,7 +150,7 @@ func adjustDataplaneLogsFrame(frame *data.Frame, query *lokiQuery) error { // we check if the fields are of correct type and length fields := frame.Fields if len(fields) != 4 { - return fmt.Errorf("invalid fields in logs frame") + return fmt.Errorf("invalid field length in logs frame. expected 4, got %d", len(fields)) } labelsField := fields[0] @@ -159,11 +159,11 @@ func adjustDataplaneLogsFrame(frame *data.Frame, query *lokiQuery) error { stringTimeField := fields[3] if (timeField.Type() != data.FieldTypeTime) || (lineField.Type() != data.FieldTypeString) || (labelsField.Type() != data.FieldTypeJSON) || (stringTimeField.Type() != data.FieldTypeString) { - return fmt.Errorf("invalid fields in logs frame") + return fmt.Errorf("invalid field types in logs frame. expected time, string, json and string, got %s, %s, %s and %s", timeField.Type(), lineField.Type(), labelsField.Type(), stringTimeField.Type()) } if (timeField.Len() != lineField.Len()) || (timeField.Len() != labelsField.Len()) || (timeField.Len() != stringTimeField.Len()) { - return fmt.Errorf("invalid fields in logs frame") + return fmt.Errorf("indifferent field lengths in logs frame. expected all to be equal, got %d, %d, %d and %d", timeField.Len(), lineField.Len(), labelsField.Len(), stringTimeField.Len()) } // this returns an error when the length of fields do not match diff --git a/pkg/tsdb/loki/loki.go b/pkg/tsdb/loki/loki.go index ca96e51c3b5..b481a736465 100644 --- a/pkg/tsdb/loki/loki.go +++ b/pkg/tsdb/loki/loki.go @@ -14,11 +14,13 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt" "github.com/grafana/grafana-plugin-sdk-go/data" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "github.com/grafana/grafana/pkg/infra/httpclient" "github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/tracing" "github.com/grafana/grafana/pkg/services/featuremgmt" + ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models" "github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery" ) @@ -122,7 +124,7 @@ func callResource(ctx context.Context, req *backend.CallResourceRequest, sender span.SetAttributes("url", lokiURL, attribute.Key("url").String(lokiURL)) defer span.End() - api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog) + api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer) rawLokiResponse, err := api.RawQuery(ctx, lokiURL) if err != nil { @@ -144,7 +146,10 @@ func callResource(ctx context.Context, req *backend.CallResourceRequest, sender func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { dsInfo, err := s.getDSInfo(ctx, req.PluginContext) + _, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName] + logger := logger.FromContext(ctx).New("api", "QueryData", "fromAlert", fromAlert) if err != nil { + logger.Error("Failed to get data source info", "err", err) result := backend.NewQueryDataResponse() return result, err } @@ -154,21 +159,22 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) logsDataplane: s.features.IsEnabled(featuremgmt.FlagLokiLogsDataplane), } - return queryData(ctx, req, dsInfo, responseOpts, s.tracer) + return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger) } -func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer) (*backend.QueryDataResponse, error) { +func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger) (*backend.QueryDataResponse, error) { result := backend.NewQueryDataResponse() - api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, logger.FromContext(ctx)) + api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer) queries, err := parseQuery(req) if err != nil { + plog.Error("Failed to parse queries", "err", err) return result, err } for _, query := range queries { - ctx, span := tracer.Start(ctx, "datasource.loki") + ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQuery") span.SetAttributes("expr", query.Expr, attribute.Key("expr").String(query.Expr)) span.SetAttributes("start_unixnano", query.Start, attribute.Key("start_unixnano").Int64(query.Start.UnixNano())) span.SetAttributes("stop_unixnano", query.End, attribute.Key("stop_unixnano").Int64(query.End.UnixNano())) @@ -177,21 +183,20 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datas span.SetAttributes("query_group_id", req.GetHTTPHeader("X-Query-Group-Id"), attribute.Key("query_group_id").String(req.GetHTTPHeader("X-Query-Group-Id"))) } - logger := logger.FromContext(ctx) // get logger with trace-id and other contextual info - logger.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr) - frames, err := runQuery(ctx, api, query, responseOpts) - span.End() queryRes := backend.DataResponse{} if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) queryRes.Error = err } else { queryRes.Frames = frames } result.Responses[query.RefID] = queryRes + span.End() } return result, nil } @@ -200,14 +205,15 @@ func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datas func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts) (data.Frames, error) { frames, err := api.DataQuery(ctx, *query, responseOpts) if err != nil { + logger.Error("Error querying loki", "err", err) return data.Frames{}, err } for _, frame := range frames { - if err = adjustFrame(frame, query, !responseOpts.metricDataplane, responseOpts.logsDataplane); err != nil { - return data.Frames{}, err - } + err = adjustFrame(frame, query, !responseOpts.metricDataplane, responseOpts.logsDataplane) + if err != nil { + logger.Error("Error adjusting frame", "err", err) return data.Frames{}, err } } @@ -223,7 +229,7 @@ func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext instance, ok := i.(*datasourceInfo) if !ok { - return nil, fmt.Errorf("failed to cast datsource info") + return nil, fmt.Errorf("failed to cast data source info") } return instance, nil