Loki: Add error handling to CallResource (#64439)

* surface loki error message for `CallResource`

* use `data.Message` instead or `errorMessage`

* change struct coming from Loki

* remove whitespace
This commit is contained in:
Sven Grossmann 2023-03-09 11:12:33 +01:00 committed by GitHub
parent 1898e76dd6
commit 473013e3f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 98 additions and 28 deletions

View File

@ -15,6 +15,7 @@ import (
jsoniter "github.com/json-iterator/go"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/util/converter"
)
@ -26,6 +27,7 @@ type LokiAPI struct {
type RawLokiResponse struct {
Body []byte
Status int
Encoding string
}
@ -96,14 +98,35 @@ func makeDataRequest(ctx context.Context, lokiDsUrl string, query lokiQuery) (*h
return req, nil
}
type lokiResponseError struct {
Message string `json:"message"`
TraceID string `json:"traceID,omitempty"`
}
type lokiError struct {
Message string
}
func makeLokiError(bytes []byte) error {
var data lokiError
err := json.Unmarshal(bytes, &data)
if err != nil {
// we were unable to convert the bytes to JSON, we return the whole text
return fmt.Errorf("%v", string(bytes))
}
if data.Message == "" {
// we got no usable error message, we return the whole text
return fmt.Errorf("%v", string(bytes))
}
return fmt.Errorf("%v", data.Message)
}
// we know there is an error,
// based on the http-response-body
// we have to make an informative error-object
func makeLokiError(body io.ReadCloser) error {
func readLokiError(body io.ReadCloser) error {
var buf bytes.Buffer
_, err := buf.ReadFrom(body)
if err != nil {
@ -122,21 +145,7 @@ func makeLokiError(body io.ReadCloser) error {
// - we take the value of the field "message"
// - if any of these steps fail, or if "message" is empty, we return the whole text
var data lokiError
err = json.Unmarshal(bytes, &data)
if err != nil {
// we were unable to convert the bytes to JSON, we return the whole text
return fmt.Errorf("%v", string(bytes))
}
errorMessage := data.Message
if errorMessage == "" {
// we got no usable error message, we return the whole text
return fmt.Errorf("%v", string(bytes))
}
return fmt.Errorf("%v", errorMessage)
return makeLokiError(bytes)
}
func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery) (data.Frames, error) {
@ -157,7 +166,7 @@ func (api *LokiAPI) DataQuery(ctx context.Context, query lokiQuery) (data.Frames
}()
if resp.StatusCode/100 != 2 {
return nil, makeLokiError(resp.Body)
return nil, readLokiError(resp.Body)
}
iter := jsoniter.Parse(jsoniter.ConfigDefault, resp.Body, 1024)
@ -211,8 +220,9 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiR
}
}()
if resp.StatusCode/100 != 2 {
return RawLokiResponse{}, makeLokiError(resp.Body)
// server errors are handled by the plugin-proxy to hide the error message
if resp.StatusCode/100 == 5 {
return RawLokiResponse{}, readLokiError(resp.Body)
}
body, err := io.ReadAll(resp.Body)
@ -220,12 +230,26 @@ func (api *LokiAPI) RawQuery(ctx context.Context, resourcePath string) (RawLokiR
return RawLokiResponse{}, err
}
encodedBytes := RawLokiResponse{
// client errors are passed as a json struct to the client
if resp.StatusCode/100 != 2 {
lokiResponseErr := lokiResponseError{Message: makeLokiError(body).Error()}
traceID := tracing.TraceIDFromContext(ctx, false)
if traceID != "" {
lokiResponseErr.TraceID = traceID
}
body, err = json.Marshal(lokiResponseErr)
if err != nil {
return RawLokiResponse{}, err
}
}
rawLokiResponse := RawLokiResponse{
Body: body,
Status: resp.StatusCode,
Encoding: resp.Header.Get("Content-Encoding"),
}
return encodedBytes, nil
return rawLokiResponse, nil
}
func getSupportingQueryHeaderValue(req *http.Request, supportingQueryType SupportingQueryType) string {

View File

@ -188,4 +188,46 @@ func TestApiReturnValues(t *testing.T) {
require.Equal(t, "gzip", encodedBytes.Encoding)
require.Equal(t, []byte("foo"), encodedBytes.Body)
})
t.Run("Loki should return the error as message", func(t *testing.T) {
called := false
api := makeCompressedMockedAPIWithUrl("http://localhost:3100", 400, "application/json", []byte("foo"), func(req *http.Request) {
called = true
})
encodedBytes, err := api.RawQuery(context.Background(), "/loki/api/v1/labels?start=1&end=2")
require.NoError(t, err)
require.True(t, called)
require.Equal(t, "gzip", encodedBytes.Encoding)
require.Equal(t, []byte("{\"message\":\"foo\"}"), encodedBytes.Body)
})
t.Run("Loki should return the error as is", func(t *testing.T) {
called := false
api := makeCompressedMockedAPIWithUrl("http://localhost:3100", 400, "application/json", []byte("{\"message\":\"foo\"}"), func(req *http.Request) {
called = true
})
encodedBytes, err := api.RawQuery(context.Background(), "/loki/api/v1/labels?start=1&end=2")
require.NoError(t, err)
require.True(t, called)
require.Equal(t, "gzip", encodedBytes.Encoding)
require.Equal(t, []byte("{\"message\":\"foo\"}"), encodedBytes.Body)
})
t.Run("Loki should not return the error on 500", func(t *testing.T) {
api := makeCompressedMockedAPIWithUrl("http://localhost:3100", 500, "application/json", []byte("foo"), nil)
_, err := api.RawQuery(context.Background(), "/loki/api/v1/labels?start=1&end=2")
require.Error(t, err)
require.ErrorContains(t, err, "foo")
})
t.Run("Loki should not return the error on 500 in JSON", func(t *testing.T) {
api := makeCompressedMockedAPIWithUrl("http://localhost:3100", 500, "application/json", []byte("{\"message\":\"foo\"}"), nil)
_, err := api.RawQuery(context.Background(), "/loki/api/v1/labels?start=1&end=2")
require.Error(t, err)
require.ErrorContains(t, err, "foo")
})
}

View File

@ -95,10 +95,10 @@ func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceReq
if err != nil {
return err
}
return callResource(ctx, req, sender, dsInfo, logger.FromContext(ctx))
return callResource(ctx, req, sender, dsInfo, logger.FromContext(ctx), s.tracer)
}
func callResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender, dsInfo *datasourceInfo, plog log.Logger) error {
func callResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender, dsInfo *datasourceInfo, plog log.Logger, tracer tracing.Tracer) error {
url := req.URL
// a very basic is-this-url-valid check
@ -113,8 +113,12 @@ func callResource(ctx context.Context, req *backend.CallResourceRequest, sender
}
lokiURL := fmt.Sprintf("/loki/api/v1/%s", url)
ctx, span := tracer.Start(ctx, "datasource.loki.CallResource")
span.SetAttributes("url", lokiURL, attribute.Key("url").String(lokiURL))
defer span.End()
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog)
encodedBytes, err := api.RawQuery(ctx, lokiURL)
rawLokiResponse, err := api.RawQuery(ctx, lokiURL)
if err != nil {
return err
@ -123,13 +127,13 @@ func callResource(ctx context.Context, req *backend.CallResourceRequest, sender
respHeaders := map[string][]string{
"content-type": {"application/json"},
}
if encodedBytes.Encoding != "" {
respHeaders["content-encoding"] = []string{encodedBytes.Encoding}
if rawLokiResponse.Encoding != "" {
respHeaders["content-encoding"] = []string{rawLokiResponse.Encoding}
}
return sender.Send(&backend.CallResourceResponse{
Status: http.StatusOK,
Status: rawLokiResponse.Status,
Headers: respHeaders,
Body: encodedBytes.Body,
Body: rawLokiResponse.Body,
})
}