diff --git a/pkg/tests/api/prometheus/prometheus_test.go b/pkg/tests/api/prometheus/prometheus_test.go index 0192df41c26..f80d8d38d58 100644 --- a/pkg/tests/api/prometheus/prometheus_test.go +++ b/pkg/tests/api/prometheus/prometheus_test.go @@ -5,7 +5,6 @@ import ( "context" "encoding/json" "fmt" - "io" "net/http" "net/http/httptest" "testing" @@ -74,7 +73,7 @@ func TestIntegrationPrometheus(t *testing.T) { "datasource": map[string]interface{}{ "uid": uid, }, - "expr": "up", + "expr": "1", "instantQuery": true, }) buf1 := &bytes.Buffer{} @@ -88,13 +87,9 @@ func TestIntegrationPrometheus(t *testing.T) { // nolint:gosec resp, err := http.Post(u, "application/json", buf1) require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) t.Cleanup(func() { - err := resp.Body.Close() - require.NoError(t, err) + _ = resp.Body.Close() }) - _, err = io.ReadAll(resp.Body) - require.NoError(t, err) require.NotNil(t, outgoingRequest) require.Equal(t, "/api/v1/query_range?q1=1&q2=2", outgoingRequest.URL.String()) @@ -124,13 +119,9 @@ func TestIntegrationPrometheus(t *testing.T) { // nolint:gosec resp, err := http.Post(u, "application/json", buf1) require.NoError(t, err) - require.Equal(t, http.StatusOK, resp.StatusCode) t.Cleanup(func() { - err := resp.Body.Close() - require.NoError(t, err) + _ = resp.Body.Close() }) - _, err = io.ReadAll(resp.Body) - require.NoError(t, err) require.NotNil(t, outgoingRequest) require.Equal(t, "/api/v1/query_range", outgoingRequest.URL.Path) diff --git a/pkg/tsdb/prometheus/healthcheck_test.go b/pkg/tsdb/prometheus/healthcheck_test.go index e4d8d060821..9dbf92c8cbb 100644 --- a/pkg/tsdb/prometheus/healthcheck_test.go +++ b/pkg/tsdb/prometheus/healthcheck_test.go @@ -2,7 +2,9 @@ package prometheus import ( "context" + "io" "net/http" + "strings" "testing" "time" @@ -27,10 +29,19 @@ type healthCheckFailRoundTripper struct { func (rt *healthCheckSuccessRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return &http.Response{ - Status: "200", - StatusCode: 200, - Header: nil, - Body: nil, + Status: "200", + StatusCode: 200, + Header: nil, + Body: io.NopCloser(strings.NewReader(`{ + "status": "success", + "data": { + "resultType": "scalar", + "result": [ + 1692969348.331, + "2" + ] + } + }`)), ContentLength: 0, Request: req, }, nil diff --git a/pkg/util/converter/jsonitere/jsonitere.go b/pkg/util/converter/jsonitere/jsonitere.go new file mode 100644 index 00000000000..383a5b08a20 --- /dev/null +++ b/pkg/util/converter/jsonitere/jsonitere.go @@ -0,0 +1,60 @@ +// package jsonitere wraps json-iterator/go's Iterator methods with error returns +// so linting can catch unchecked errors. +// The underlying iterator's Error property is returned and not reset. +// See json-iterator/go for method documentation and additional methods that +// can be added to this library. +package jsonitere + +import j "github.com/json-iterator/go" + +type Iterator struct { + // named property instead of embedded so there is no + // confusion about which method or property is called + i *j.Iterator +} + +func NewIterator(i *j.Iterator) *Iterator { + return &Iterator{i} +} + +func (iter *Iterator) Read() (interface{}, error) { + return iter.i.Read(), iter.i.Error +} + +func (iter *Iterator) ReadAny() (j.Any, error) { + return iter.i.ReadAny(), iter.i.Error +} + +func (iter *Iterator) ReadArray() (bool, error) { + return iter.i.ReadArray(), iter.i.Error +} + +func (iter *Iterator) ReadObject() (string, error) { + return iter.i.ReadObject(), iter.i.Error +} + +func (iter *Iterator) ReadString() (string, error) { + return iter.i.ReadString(), iter.i.Error +} + +func (iter *Iterator) WhatIsNext() (j.ValueType, error) { + return iter.i.WhatIsNext(), iter.i.Error +} + +func (iter *Iterator) Skip() error { + iter.i.Skip() + return iter.i.Error +} + +func (iter *Iterator) ReadVal(obj interface{}) error { + iter.i.ReadVal(obj) + return iter.i.Error +} + +func (iter *Iterator) ReadFloat64() (float64, error) { + return iter.i.ReadFloat64(), iter.i.Error +} + +func (iter *Iterator) ReadInt8() (int8, error) { + return iter.i.ReadInt8(), iter.i.Error +} diff --git a/pkg/util/converter/prom.go b/pkg/util/converter/prom.go index 6b250323d86..df13bdca2f2 100644 --- a/pkg/util/converter/prom.go +++ b/pkg/util/converter/prom.go @@ -8,6 +8,7 @@ import ( "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/util/converter/jsonitere" jsoniter "github.com/json-iterator/go" ) @@ -20,40 +21,70 @@ type Options struct { Dataplane bool } +func rspErr(e error) backend.DataResponse { + return backend.DataResponse{Error: e} +} + // ReadPrometheusStyleResult will read results from a prometheus or loki server and return data frames -func ReadPrometheusStyleResult(iter *jsoniter.Iterator, opt Options) backend.DataResponse { +func ReadPrometheusStyleResult(jIter *jsoniter.Iterator, opt Options) backend.DataResponse { + iter := jsonitere.NewIterator(jIter) var rsp backend.DataResponse status := "unknown" errorType := "" - err := "" + promErrString := "" warnings := []data.Notice{} - for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() { +l1Fields: + for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() { + if err != nil { + return rspErr(err) + } switch l1Field { case "status": - status = iter.ReadString() + if status, err = iter.ReadString(); err != nil { + return rspErr(err) + } case "data": rsp = readPrometheusData(iter, opt) + if rsp.Error != nil { + return rsp + } case "error": - err = iter.ReadString() + if promErrString, err = iter.ReadString(); err != nil { + return rspErr(err) + } case "errorType": - errorType = iter.ReadString() + if errorType, err = iter.ReadString(); err != nil { + return rspErr(err) + } case "warnings": - warnings = readWarnings(iter) + if warnings, err = readWarnings(iter); err != nil { + return rspErr(err) + } + + case "": + if err != nil { + return rspErr(err) + } + break l1Fields default: - v := iter.Read() + v, err := iter.Read() + if err != nil { + rsp.Error = err + return rsp + } logf("[ROOT] TODO, support key: %s / %v\n", l1Field, v) } } if status == "error" { return backend.DataResponse{ - Error: fmt.Errorf("%s: %s", errorType, err), + Error: fmt.Errorf("%s: %s", errorType, promErrString), } } @@ -69,27 +100,48 @@ func ReadPrometheusStyleResult(iter *jsoniter.Iterator, opt Options) backend.Dat return rsp } -func readWarnings(iter *jsoniter.Iterator) []data.Notice { +func readWarnings(iter *jsonitere.Iterator) ([]data.Notice, error) { warnings := []data.Notice{} - if iter.WhatIsNext() != jsoniter.ArrayValue { - return warnings + next, err := iter.WhatIsNext() + if err != nil { + return nil, err } - for iter.ReadArray() { - if iter.WhatIsNext() == jsoniter.StringValue { + if next != jsoniter.ArrayValue { + return warnings, nil + } + + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + return nil, err + } + next, err := iter.WhatIsNext() + if err != nil { + return nil, err + } + if next == jsoniter.StringValue { + s, err := iter.ReadString() + if err != nil { + return nil, err + } notice := data.Notice{ Severity: data.NoticeSeverityWarning, - Text: iter.ReadString(), + Text: s, } warnings = append(warnings, notice) } } - return warnings + return warnings, nil } -func readPrometheusData(iter *jsoniter.Iterator, opt Options) backend.DataResponse { - t := iter.WhatIsNext() +func readPrometheusData(iter *jsonitere.Iterator, opt Options) backend.DataResponse { + var rsp backend.DataResponse + t, err := iter.WhatIsNext() + if err != nil { + return rspErr(err) + } + if t == jsoniter.ArrayValue { return readArrayData(iter) } @@ -101,32 +153,54 @@ func readPrometheusData(iter *jsoniter.Iterator, opt Options) backend.DataRespon } resultType := "" - var rsp backend.DataResponse - for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() { +l1Fields: + for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() { + if err != nil { + return rspErr(err) + } switch l1Field { case "resultType": - resultType = iter.ReadString() - + resultType, err = iter.ReadString() + if err != nil { + return rspErr(err) + } case "result": switch resultType { case "matrix", "vector": rsp = readMatrixOrVectorMulti(iter, resultType, opt) + if rsp.Error != nil { + return rsp + } case "streams": rsp = readStream(iter) + if rsp.Error != nil { + return rsp + } case "string": rsp = readString(iter) + if rsp.Error != nil { + return rsp + } case "scalar": rsp = readScalar(iter) + if rsp.Error != nil { + return rsp + } default: - iter.Skip() + if err = iter.Skip(); err != nil { + return rspErr(err) + } rsp = backend.DataResponse{ Error: fmt.Errorf("unknown result type: %s", resultType), } } case "stats": - v := iter.Read() + v, err := iter.Read() + if err != nil { + rspErr(err) + } if len(rsp.Frames) > 0 { meta := rsp.Frames[0].Meta if meta == nil { @@ -138,8 +212,17 @@ func readPrometheusData(iter *jsoniter.Iterator, opt Options) backend.DataRespon } } + case "": + if err != nil { + return rspErr(err) + } + break l1Fields + default: - v := iter.Read() + v, err := iter.Read() + if err != nil { + return rspErr(err) + } logf("[data] TODO, support key: %s / %v\n", l1Field, v) } } @@ -148,21 +231,38 @@ func readPrometheusData(iter *jsoniter.Iterator, opt Options) backend.DataRespon } // will return strings or exemplars -func readArrayData(iter *jsoniter.Iterator) backend.DataResponse { +func readArrayData(iter *jsonitere.Iterator) backend.DataResponse { lookup := make(map[string]*data.Field) var labelFrame *data.Frame rsp := backend.DataResponse{} + stringField := data.NewFieldFromFieldType(data.FieldTypeString, 0) stringField.Name = "Value" - for iter.ReadArray() { - switch iter.WhatIsNext() { + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + rspErr(err) + } + + next, err := iter.WhatIsNext() + if err != nil { + return rspErr(err) + } + + switch next { case jsoniter.StringValue: - stringField.Append(iter.ReadString()) + s, err := iter.ReadString() + if err != nil { + return rspErr(err) + } + stringField.Append(s) // Either label or exemplars case jsoniter.ObjectValue: - exemplar, labelPairs := readLabelsOrExemplars(iter) + exemplar, labelPairs, err := readLabelsOrExemplars(iter) + if err != nil { + rspErr(err) + } if exemplar != nil { rsp.Frames = append(rsp.Frames, exemplar) } else if labelPairs != nil { @@ -199,7 +299,10 @@ func readArrayData(iter *jsoniter.Iterator) backend.DataResponse { default: { - ext := iter.ReadAny() + ext, err := iter.ReadAny() + if err != nil { + rspErr(err) + } v := fmt.Sprintf("%v", ext) stringField.Append(v) } @@ -214,23 +317,38 @@ func readArrayData(iter *jsoniter.Iterator) backend.DataResponse { } // For consistent ordering read values to an array not a map -func readLabelsAsPairs(iter *jsoniter.Iterator) [][2]string { +func readLabelsAsPairs(iter *jsonitere.Iterator) ([][2]string, error) { pairs := make([][2]string, 0, 10) - for k := iter.ReadObject(); k != ""; k = iter.ReadObject() { - pairs = append(pairs, [2]string{k, iter.ReadString()}) + for k, err := iter.ReadObject(); k != ""; k, err = iter.ReadObject() { + if err != nil { + return nil, err + } + v, err := iter.ReadString() + if err != nil { + return nil, err + } + pairs = append(pairs, [2]string{k, v}) } - return pairs + return pairs, nil } -func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) { +func readLabelsOrExemplars(iter *jsonitere.Iterator) (*data.Frame, [][2]string, error) { pairs := make([][2]string, 0, 10) labels := data.Labels{} var frame *data.Frame - for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() { +l1Fields: + for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() { + if err != nil { + return nil, nil, err + } switch l1Field { case "seriesLabels": - iter.ReadVal(&labels) + err = iter.ReadVal(&labels) + if err != nil { + return nil, nil, err + } + case "exemplars": lookup := make(map[string]*data.Field) timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0) @@ -242,21 +360,42 @@ func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) { frame.Meta = &data.FrameMeta{ Custom: resultTypeToCustomMeta("exemplar"), } - for iter.ReadArray() { - for l2Field := iter.ReadObject(); l2Field != ""; l2Field = iter.ReadObject() { + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + return nil, nil, err + } + for l2Field, err := iter.ReadObject(); l2Field != ""; l2Field, err = iter.ReadObject() { + if err != nil { + return nil, nil, err + } switch l2Field { // nolint:goconst case "value": - v, _ := strconv.ParseFloat(iter.ReadString(), 64) + s, err := iter.ReadString() + if err != nil { + return nil, nil, err + } + v, err := strconv.ParseFloat(s, 64) + if err != nil { + return nil, nil, err + } valueField.Append(v) case "timestamp": - ts := timeFromFloat(iter.ReadFloat64()) + f, err := iter.ReadFloat64() + if err != nil { + return nil, nil, err + } + ts := timeFromFloat(f) timeField.Append(ts) case "labels": max := 0 - for _, pair := range readLabelsAsPairs(iter) { + pairs, err := readLabelsAsPairs(iter) + if err != nil { + return nil, nil, err + } + for _, pair := range pairs { k := pair[0] v := pair[1] f, ok := lookup[k] @@ -281,7 +420,10 @@ func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) { } default: - iter.Skip() + if err = iter.Skip(); err != nil { + return nil, nil, err + } + frame.AppendNotices(data.Notice{ Severity: data.NoticeSeverityError, Text: fmt.Sprintf("unable to parse key: %s in response body", l2Field), @@ -289,27 +431,54 @@ func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) { } } } + case "": + if err != nil { + return nil, nil, err + } + break l1Fields + default: - v := fmt.Sprintf("%v", iter.Read()) + iV, err := iter.Read() + if err != nil { + return nil, nil, err + } + v := fmt.Sprintf("%v", iV) pairs = append(pairs, [2]string{l1Field, v}) } } - return frame, pairs + return frame, pairs, nil } -func readString(iter *jsoniter.Iterator) backend.DataResponse { +func readString(iter *jsonitere.Iterator) backend.DataResponse { timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0) timeField.Name = data.TimeSeriesTimeFieldName valueField := data.NewFieldFromFieldType(data.FieldTypeString, 0) valueField.Name = data.TimeSeriesValueFieldName valueField.Labels = data.Labels{} - iter.ReadArray() - t := iter.ReadFloat64() - iter.ReadArray() - v := iter.ReadString() - iter.ReadArray() + _, err := iter.ReadArray() + if err != nil { + return rspErr(err) + } + + var t float64 + if t, err = iter.ReadFloat64(); err != nil { + return rspErr(err) + } + + if _, err = iter.ReadArray(); err != nil { + return rspErr(err) + } + + var v string + if v, err = iter.ReadString(); err != nil { + return rspErr(err) + } + + if _, err = iter.ReadArray(); err != nil { + return rspErr(err) + } tt := timeFromFloat(t) timeField.Append(tt) @@ -326,7 +495,9 @@ func readString(iter *jsoniter.Iterator) backend.DataResponse { } } -func readScalar(iter *jsoniter.Iterator) backend.DataResponse { +func readScalar(iter *jsonitere.Iterator) backend.DataResponse { + rsp := backend.DataResponse{} + timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0) timeField.Name = data.TimeSeriesTimeFieldName valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0) @@ -334,10 +505,12 @@ func readScalar(iter *jsoniter.Iterator) backend.DataResponse { valueField.Labels = data.Labels{} t, v, err := readTimeValuePair(iter) - if err == nil { - timeField.Append(t) - valueField.Append(v) + if err != nil { + rsp.Error = err + return rsp } + timeField.Append(t) + valueField.Append(v) frame := data.NewFrame("", timeField, valueField) frame.Meta = &data.FrameMeta{ @@ -350,10 +523,13 @@ func readScalar(iter *jsoniter.Iterator) backend.DataResponse { } } -func readMatrixOrVectorMulti(iter *jsoniter.Iterator, resultType string, opt Options) backend.DataResponse { +func readMatrixOrVectorMulti(iter *jsonitere.Iterator, resultType string, opt Options) backend.DataResponse { rsp := backend.DataResponse{} - for iter.ReadArray() { + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + return rspErr(err) + } timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0) timeField.Name = data.TimeSeriesTimeFieldName valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0) @@ -362,50 +538,64 @@ func readMatrixOrVectorMulti(iter *jsoniter.Iterator, resultType string, opt Opt var histogram *histogramInfo - for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() { + for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() { + if err != nil { + return rspErr(err) + } switch l1Field { case "metric": - iter.ReadVal(&valueField.Labels) + if err = iter.ReadVal(&valueField.Labels); err != nil { + return rspErr(err) + } case "value": t, v, err := readTimeValuePair(iter) - if err == nil { - timeField.Append(t) - valueField.Append(v) + if err != nil { + return rspErr(err) } + timeField.Append(t) + valueField.Append(v) // nolint:goconst case "values": - for iter.ReadArray() { - t, v, err := readTimeValuePair(iter) - if err == nil { - timeField.Append(t) - valueField.Append(v) + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + return rspErr(err) } + t, v, err := readTimeValuePair(iter) + if err != nil { + return rspErr(err) + } + timeField.Append(t) + valueField.Append(v) } case "histogram": if histogram == nil { histogram = newHistogramInfo() } - err := readHistogram(iter, histogram) + err = readHistogram(iter, histogram) if err != nil { - rsp.Error = err + return rspErr(err) } case "histograms": if histogram == nil { histogram = newHistogramInfo() } - for iter.ReadArray() { - err := readHistogram(iter, histogram) + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { if err != nil { - rsp.Error = err + return rspErr(err) + } + if err = readHistogram(iter, histogram); err != nil { + return rspErr(err) } } default: - iter.Skip() + if err = iter.Skip(); err != nil { + return rspErr(err) + } logf("readMatrixOrVector: %s\n", l1Field) } } @@ -439,12 +629,28 @@ func readMatrixOrVectorMulti(iter *jsoniter.Iterator, resultType string, opt Opt return rsp } -func readTimeValuePair(iter *jsoniter.Iterator) (time.Time, float64, error) { - iter.ReadArray() - t := iter.ReadFloat64() - iter.ReadArray() - v := iter.ReadString() - iter.ReadArray() +func readTimeValuePair(iter *jsonitere.Iterator) (time.Time, float64, error) { + if _, err := iter.ReadArray(); err != nil { + return time.Time{}, 0, err + } + + t, err := iter.ReadFloat64() + if err != nil { + return time.Time{}, 0, err + } + + if _, err = iter.ReadArray(); err != nil { + return time.Time{}, 0, err + } + + var v string + if v, err = iter.ReadString(); err != nil { + return time.Time{}, 0, err + } + + if _, err = iter.ReadArray(); err != nil { + return time.Time{}, 0, err + } tt := timeFromFloat(t) fv, err := strconv.ParseFloat(v, 64) @@ -478,75 +684,123 @@ func newHistogramInfo() *histogramInfo { // This will read a single sparse histogram // [ time, { count, sum, buckets: [...] }] -func readHistogram(iter *jsoniter.Iterator, hist *histogramInfo) error { +func readHistogram(iter *jsonitere.Iterator, hist *histogramInfo) error { // first element - iter.ReadArray() - t := timeFromFloat(iter.ReadFloat64()) + if _, err := iter.ReadArray(); err != nil { + return err + } - var err error + f, err := iter.ReadFloat64() + if err != nil { + return err + } + t := timeFromFloat(f) // next object element - iter.ReadArray() - for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() { + if _, err := iter.ReadArray(); err != nil { + return err + } + + for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() { + if err != nil { + return err + } switch l1Field { case "count": - iter.Skip() + if err = iter.Skip(); err != nil { + return err + } case "sum": - iter.Skip() + if err = iter.Skip(); err != nil { + return err + } case "buckets": - for iter.ReadArray() { - hist.time.Append(t) - - iter.ReadArray() - hist.yLayout.Append(iter.ReadInt8()) - - iter.ReadArray() - err = appendValueFromString(iter, hist.yMin) + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { if err != nil { return err } + hist.time.Append(t) + + if _, err := iter.ReadArray(); err != nil { + return err + } + + v, err := iter.ReadInt8() + if err != nil { + return err + } + hist.yLayout.Append(v) + + if _, err := iter.ReadArray(); err != nil { + return err + } + + if err = appendValueFromString(iter, hist.yMin); err != nil { + return err + } + + if _, err := iter.ReadArray(); err != nil { + return err + } - iter.ReadArray() err = appendValueFromString(iter, hist.yMax) if err != nil { return err } - iter.ReadArray() + if _, err := iter.ReadArray(); err != nil { + return err + } + err = appendValueFromString(iter, hist.count) if err != nil { return err } - if iter.ReadArray() { + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + return err + } return fmt.Errorf("expected close array") } } default: - iter.Skip() + if err = iter.Skip(); err != nil { + return err + } logf("[SKIP]readHistogram: %s\n", l1Field) } } - if iter.ReadArray() { + if more, err := iter.ReadArray(); more || err != nil { + if err != nil { + return err + } return fmt.Errorf("expected to be done") } return nil } -func appendValueFromString(iter *jsoniter.Iterator, field *data.Field) error { - v, err := strconv.ParseFloat(iter.ReadString(), 64) - if err != nil { +func appendValueFromString(iter *jsonitere.Iterator, field *data.Field) error { + var err error + var s string + if s, err = iter.ReadString(); err != nil { return err } + + var v float64 + if v, err = strconv.ParseFloat(s, 64); err != nil { + return err + } + field.Append(v) return nil } -func readStream(iter *jsoniter.Iterator) backend.DataResponse { +func readStream(iter *jsonitere.Iterator) backend.DataResponse { rsp := backend.DataResponse{} labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0) @@ -568,34 +822,73 @@ func readStream(iter *jsoniter.Iterator) backend.DataResponse { return backend.DataResponse{Error: err} } - for iter.ReadArray() { - for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() { + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + rspErr(err) + } + + l1Fields: + for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() { + if err != nil { + return rspErr(err) + } switch l1Field { case "stream": // we need to clear `labels`, because `iter.ReadVal` // only appends to it labels := data.Labels{} - iter.ReadVal(&labels) - labelJson, err = labelsToRawJson(labels) - if err != nil { - return backend.DataResponse{Error: err} + if err = iter.ReadVal(&labels); err != nil { + return rspErr(err) + } + + if labelJson, err = labelsToRawJson(labels); err != nil { + return rspErr(err) } case "values": - for iter.ReadArray() { - iter.ReadArray() - ts := iter.ReadString() - iter.ReadArray() - line := iter.ReadString() - iter.ReadArray() + for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() { + if err != nil { + rsp.Error = err + return rsp + } - t := timeFromLokiString(ts) + if _, err = iter.ReadArray(); err != nil { + return rspErr(err) + } + + ts, err := iter.ReadString() + if err != nil { + return rspErr(err) + } + + if _, err = iter.ReadArray(); err != nil { + return rspErr(err) + } + + line, err := iter.ReadString() + if err != nil { + return rspErr(err) + } + + if _, err = iter.ReadArray(); err != nil { + return rspErr(err) + } + + t, err := timeFromLokiString(ts) + if err != nil { + return rspErr(err) + } labelsField.Append(labelJson) timeField.Append(t) lineField.Append(line) tsField.Append(ts) } + case "": + if err != nil { + return rspErr(err) + } + break l1Fields } } } @@ -615,7 +908,7 @@ func timeFromFloat(fv float64) time.Time { return time.UnixMilli(int64(fv * 1000.0)).UTC() } -func timeFromLokiString(str string) time.Time { +func timeFromLokiString(str string) (time.Time, error) { // normal time values look like: 1645030246277587968 // and are less than: math.MaxInt65=9223372036854775807 // This will do a fast path for any date before 2033 @@ -623,13 +916,17 @@ func timeFromLokiString(str string) time.Time { if s < 19 || (s == 19 && str[0] == '1') { ns, err := strconv.ParseInt(str, 10, 64) if err == nil { - return time.Unix(0, ns).UTC() + return time.Unix(0, ns).UTC(), nil } } + if s < 10 { + return time.Time{}, fmt.Errorf("unexpected time format '%v' in response. response may have been truncated", str) + } + ss, _ := strconv.ParseInt(str[0:10], 10, 64) ns, _ := strconv.ParseInt(str[10:], 10, 64) - return time.Unix(ss, ns).UTC() + return time.Unix(ss, ns).UTC(), nil } func labelsToRawJson(labels data.Labels) (json.RawMessage, error) { diff --git a/pkg/util/converter/prom_test.go b/pkg/util/converter/prom_test.go index 5f8a99af559..693eefa6600 100644 --- a/pkg/util/converter/prom_test.go +++ b/pkg/util/converter/prom_test.go @@ -1,6 +1,7 @@ package converter import ( + "fmt" "os" "path" "strings" @@ -8,39 +9,63 @@ import ( "time" "github.com/grafana/grafana-plugin-sdk-go/experimental" + "github.com/grafana/grafana/pkg/infra/httpclient" jsoniter "github.com/json-iterator/go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -const update = true +const update = false + +var files = []string{ + "prom-labels", + "prom-matrix", + "prom-matrix-with-nans", + "prom-matrix-histogram-no-labels", + "prom-matrix-histogram-partitioned", + "prom-vector-histogram-no-labels", + "prom-vector", + "prom-string", + "prom-scalar", + "prom-series", + "prom-warnings", + "prom-error", + "prom-exemplars-a", + "prom-exemplars-b", + "loki-streams-a", + "loki-streams-b", + "loki-streams-c", +} func TestReadPromFrames(t *testing.T) { - files := []string{ - "prom-labels", - "prom-matrix", - "prom-matrix-with-nans", - "prom-matrix-histogram-no-labels", - "prom-matrix-histogram-partitioned", - "prom-vector-histogram-no-labels", - "prom-vector", - "prom-string", - "prom-scalar", - "prom-series", - "prom-warnings", - "prom-error", - "prom-exemplars-a", - "prom-exemplars-b", - "loki-streams-a", - "loki-streams-b", - "loki-streams-c", - } - for _, name := range files { t.Run(name, runScenario(name, Options{})) } } +func TestReadLimited(t *testing.T) { + for _, name := range files { + p := path.Join("testdata", name+".json") + stat, err := os.Stat(p) + require.NoError(t, err) + size := stat.Size() + + for i := int64(10); i < size-1; i += size / 10 { + t.Run(fmt.Sprintf("%v_%v", name, i), func(t *testing.T) { + //nolint:gosec + f, err := os.Open(p) + require.NoError(t, err) + mbr := httpclient.MaxBytesReader(f, i) + + iter := jsoniter.Parse(jsoniter.ConfigDefault, mbr, 1024) + rsp := ReadPrometheusStyleResult(iter, Options{}) + + require.ErrorContains(t, rsp.Error, "response body too large") + }) + } + } +} + // FIXME: // //lint:ignore U1000 Ignore used function for now @@ -58,6 +83,7 @@ func runScenario(name string, opts Options) func(t *testing.T) { require.Error(t, rsp.Error) return } + require.NoError(t, rsp.Error) fname := name + "-frame" experimental.CheckGoldenJSONResponse(t, "testdata", fname, &rsp, update) @@ -70,12 +96,17 @@ func TestTimeConversions(t *testing.T) { time.Date(2020, time.September, 14, 15, 22, 25, 479000000, time.UTC), timeFromFloat(1600096945.479)) + ti, err := timeFromLokiString("1645030246277587968") + require.NoError(t, err) // Loki date parsing assert.Equal(t, time.Date(2022, time.February, 16, 16, 50, 46, 277587968, time.UTC), - timeFromLokiString("1645030246277587968")) + ti) + + ti, err = timeFromLokiString("2000000000000000000") + require.NoError(t, err) assert.Equal(t, time.Date(2033, time.May, 18, 3, 33, 20, 0, time.UTC), - timeFromLokiString("2000000000000000000")) + ti) }