mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Prometheus: Handle non-json errors in a better way (#99342)
* handle json errors in a better way * update comments * update unit tests * Update pkg/promlib/converter/prom.go Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com> * Update pkg/promlib/querydata/response_test.go Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com> * Update pkg/promlib/querydata/response_test.go Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com> * Update pkg/promlib/querydata/response_test.go Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com> * update import --------- Co-authored-by: Arve Knudsen <arve.knudsen@gmail.com>
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
sdkjsoniter "github.com/grafana/grafana-plugin-sdk-go/data/utils/jsoniter"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/experimental/status"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
)
|
||||
|
||||
@@ -23,7 +24,7 @@ type Options struct {
|
||||
}
|
||||
|
||||
func rspErr(e error) backend.DataResponse {
|
||||
return backend.DataResponse{Error: e}
|
||||
return backend.DataResponse{Error: e, ErrorSource: status.SourceDownstream}
|
||||
}
|
||||
|
||||
// ReadPrometheusStyleResult will read results from a prometheus or loki server and return data frames
|
||||
@@ -39,7 +40,7 @@ func ReadPrometheusStyleResult(jIter *jsoniter.Iterator, opt Options) backend.Da
|
||||
l1Fields:
|
||||
for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() {
|
||||
if err != nil {
|
||||
return rspErr(err)
|
||||
return rspErr(fmt.Errorf("response from prometheus couldn't be parsed. it is non-json: %w", err))
|
||||
}
|
||||
switch l1Field {
|
||||
case "status":
|
||||
|
||||
@@ -3,6 +3,7 @@ package querydata
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
@@ -28,28 +29,59 @@ func (s *QueryData) parseResponse(ctx context.Context, q *models.Query, res *htt
|
||||
ctx, endSpan := utils.StartTrace(ctx, s.tracer, "datasource.prometheus.parseResponse")
|
||||
defer endSpan()
|
||||
|
||||
iter := jsoniter.Parse(jsoniter.ConfigDefault, res.Body, 1024)
|
||||
r := converter.ReadPrometheusStyleResult(iter, converter.Options{Dataplane: true})
|
||||
r.Status = backend.Status(res.StatusCode)
|
||||
statusCode := res.StatusCode
|
||||
|
||||
// Add frame to attach metadata
|
||||
if len(r.Frames) == 0 && !q.ExemplarQuery {
|
||||
r.Frames = append(r.Frames, data.NewFrame(""))
|
||||
}
|
||||
switch {
|
||||
// Status codes that Prometheus might return
|
||||
// so we want to parse the response
|
||||
// https://prometheus.io/docs/prometheus/latest/querying/api/#format-overview
|
||||
case statusCode >= http.StatusOK && statusCode < http.StatusMultipleChoices,
|
||||
statusCode == http.StatusBadRequest,
|
||||
statusCode == http.StatusUnprocessableEntity,
|
||||
statusCode == http.StatusServiceUnavailable:
|
||||
|
||||
// The ExecutedQueryString can be viewed in QueryInspector in UI
|
||||
for i, frame := range r.Frames {
|
||||
addMetadataToMultiFrame(q, frame)
|
||||
if i == 0 {
|
||||
frame.Meta.ExecutedQueryString = executedQueryString(q)
|
||||
iter := jsoniter.Parse(jsoniter.ConfigDefault, res.Body, 1024)
|
||||
r := converter.ReadPrometheusStyleResult(iter, converter.Options{Dataplane: true})
|
||||
r.Status = backend.Status(res.StatusCode)
|
||||
|
||||
// Add frame to attach metadata
|
||||
if len(r.Frames) == 0 && !q.ExemplarQuery {
|
||||
r.Frames = append(r.Frames, data.NewFrame(""))
|
||||
}
|
||||
}
|
||||
|
||||
if r.Error == nil {
|
||||
r = s.processExemplars(ctx, q, r)
|
||||
}
|
||||
// The ExecutedQueryString can be viewed in QueryInspector in UI
|
||||
for i, frame := range r.Frames {
|
||||
addMetadataToMultiFrame(q, frame)
|
||||
if i == 0 {
|
||||
frame.Meta.ExecutedQueryString = executedQueryString(q)
|
||||
}
|
||||
}
|
||||
|
||||
return r
|
||||
if r.Error == nil {
|
||||
r = s.processExemplars(ctx, q, r)
|
||||
}
|
||||
|
||||
return r
|
||||
default:
|
||||
// Unknown status code. We don't want to parse the response.
|
||||
const maxBodySize = 1024
|
||||
lr := io.LimitReader(res.Body, maxBodySize)
|
||||
tb, _ := io.ReadAll(lr)
|
||||
|
||||
s.log.FromContext(ctx).Error("Unexpected response received", "status", statusCode, "body", tb)
|
||||
|
||||
errResp := backend.DataResponse{
|
||||
Error: fmt.Errorf("unexpected response with status code %d: %s", statusCode, tb),
|
||||
ErrorSource: backend.ErrorSourceFromHTTPStatus(statusCode),
|
||||
}
|
||||
|
||||
f := data.NewFrame("")
|
||||
addMetadataToMultiFrame(q, f)
|
||||
f.Meta.ExecutedQueryString = executedQueryString(q)
|
||||
errResp.Frames = append(errResp.Frames, f)
|
||||
|
||||
return errResp
|
||||
}
|
||||
}
|
||||
|
||||
func (s *QueryData) processExemplars(ctx context.Context, q *models.Query, dr backend.DataResponse) backend.DataResponse {
|
||||
|
||||
@@ -7,7 +7,9 @@ import (
|
||||
"net/http"
|
||||
"testing"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/grafana/grafana/pkg/promlib/models"
|
||||
"github.com/grafana/grafana/pkg/promlib/querydata/exemplar"
|
||||
@@ -18,7 +20,7 @@ func TestQueryData_parseResponse(t *testing.T) {
|
||||
|
||||
t.Run("resultType is before result the field must parsed normally", func(t *testing.T) {
|
||||
resBody := `{"data":{"resultType":"vector", "result":[{"metric":{"__name__":"some_name","environment":"some_env","id":"some_id","instance":"some_instance:1234","job":"some_job","name":"another_name","region":"some_region"},"value":[1.1,"2"]}]},"status":"success"}`
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody))}
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody)), StatusCode: 200}
|
||||
result := qd.parseResponse(context.Background(), &models.Query{}, res)
|
||||
assert.Nil(t, result.Error)
|
||||
assert.Len(t, result.Frames, 1)
|
||||
@@ -26,7 +28,7 @@ func TestQueryData_parseResponse(t *testing.T) {
|
||||
|
||||
t.Run("resultType is after the result field must parsed normally", func(t *testing.T) {
|
||||
resBody := `{"data":{"result":[{"metric":{"__name__":"some_name","environment":"some_env","id":"some_id","instance":"some_instance:1234","job":"some_job","name":"another_name","region":"some_region"},"value":[1.1,"2"]}],"resultType":"vector"},"status":"success"}`
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody))}
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody)), StatusCode: 200}
|
||||
result := qd.parseResponse(context.Background(), &models.Query{}, res)
|
||||
assert.Nil(t, result.Error)
|
||||
assert.Len(t, result.Frames, 1)
|
||||
@@ -34,7 +36,7 @@ func TestQueryData_parseResponse(t *testing.T) {
|
||||
|
||||
t.Run("no resultType is existed in the data", func(t *testing.T) {
|
||||
resBody := `{"data":{"result":[{"metric":{"__name__":"some_name","environment":"some_env","id":"some_id","instance":"some_instance:1234","job":"some_job","name":"another_name","region":"some_region"},"value":[1.1,"2"]}]},"status":"success"}`
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody))}
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody)), StatusCode: 200}
|
||||
result := qd.parseResponse(context.Background(), &models.Query{}, res)
|
||||
assert.Error(t, result.Error)
|
||||
assert.Equal(t, result.Error.Error(), "no resultType found")
|
||||
@@ -42,7 +44,7 @@ func TestQueryData_parseResponse(t *testing.T) {
|
||||
|
||||
t.Run("resultType is set as empty string before result", func(t *testing.T) {
|
||||
resBody := `{"data":{"resultType":"", "result":[{"metric":{"__name__":"some_name","environment":"some_env","id":"some_id","instance":"some_instance:1234","job":"some_job","name":"another_name","region":"some_region"},"value":[1.1,"2"]}]},"status":"success"}`
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody))}
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody)), StatusCode: 200}
|
||||
result := qd.parseResponse(context.Background(), &models.Query{}, res)
|
||||
assert.Error(t, result.Error)
|
||||
assert.Equal(t, result.Error.Error(), "unknown result type: ")
|
||||
@@ -50,7 +52,7 @@ func TestQueryData_parseResponse(t *testing.T) {
|
||||
|
||||
t.Run("resultType is set as empty string after result", func(t *testing.T) {
|
||||
resBody := `{"data":{"result":[{"metric":{"__name__":"some_name","environment":"some_env","id":"some_id","instance":"some_instance:1234","job":"some_job","name":"another_name","region":"some_region"},"value":[1.1,"2"]}],"resultType":""},"status":"success"}`
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody))}
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody)), StatusCode: 200}
|
||||
result := qd.parseResponse(context.Background(), &models.Query{}, res)
|
||||
assert.Error(t, result.Error)
|
||||
assert.Equal(t, result.Error.Error(), "unknown result type: ")
|
||||
@@ -61,10 +63,47 @@ func TestAddMetadataToMultiFrame(t *testing.T) {
|
||||
t.Run("when you have native histogram result", func(t *testing.T) {
|
||||
qd := QueryData{exemplarSampler: exemplar.NewStandardDeviationSampler}
|
||||
resBody := `{"status":"success","data":{"resultType":"matrix","result":[{"metric":{"__name__":"rpc_durations_native_histogram_seconds","instance":"nativehisto:8080","job":"prometheus"},"histograms":[[1729529685,{"count":"7243102","sum":"72460202.93145595","buckets":[[0,"1.8340080864093422","2","10"],[0,"2","2.1810154653305154","68"]]}],[1729529700,{"count":"7243490","sum":"72464056.03309634","buckets":[[0,"1.8340080864093422","2","10"],[0,"2","2.1810154653305154","68"]]}],[1729529715,{"count":"7243880","sum":"72467935.35871512","buckets":[[0,"1.8340080864093422","2","10"],[0,"2","2.1810154653305154","68"]]}]]}]}}`
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody))}
|
||||
res := &http.Response{Body: io.NopCloser(bytes.NewBufferString(resBody)), StatusCode: 200}
|
||||
result := qd.parseResponse(context.Background(), &models.Query{}, res)
|
||||
assert.Nil(t, result.Error)
|
||||
assert.Len(t, result.Frames, 1)
|
||||
assert.Equal(t, "yMin", result.Frames[0].Fields[1].Name)
|
||||
})
|
||||
}
|
||||
|
||||
// Helper function to create mock HTTP response.
|
||||
func createMockResponse(statusCode int, body string) *http.Response {
|
||||
return &http.Response{
|
||||
StatusCode: statusCode,
|
||||
Body: io.NopCloser(bytes.NewReader([]byte(body))),
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseResponse_ErrorCases(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
statusCode int
|
||||
body string
|
||||
}{
|
||||
{"500 Internal Server Error", http.StatusInternalServerError, `{"error":"internal server error"}`},
|
||||
{"404 Not Found", http.StatusNotFound, `{"error":"not found"}`},
|
||||
{"401 Unauthorized", http.StatusUnauthorized, `{"error":"unauthorized"}`},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
res := createMockResponse(tt.statusCode, tt.body)
|
||||
q := &models.Query{}
|
||||
qd := QueryData{exemplarSampler: exemplar.NewStandardDeviationSampler}
|
||||
qd.log = log.New()
|
||||
resp := qd.parseResponse(ctx, q, res)
|
||||
|
||||
require.Error(t, resp.Error)
|
||||
assert.Contains(t, resp.Error.Error(), "unexpected response")
|
||||
assert.Len(t, resp.Frames, 1)
|
||||
assert.NoError(t, res.Body.Close())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user