Prometheus: Check for errors on json response parsing (#73788)

- The util/converter Prometheus response json parse was not checking for errors while parsing. It now does. In particular, if `[dataproxy]/response_limit` is set in Grafana's config, it will now recognize the limit error.
- Fixes #73747
- Adds `jsonitere` package, which wraps json-iterator/go's Iterator's Methods with methods that return errors, so errcheck linting can be relied upon
- Impact:
  - If something was sending malformed JSON to the prometheus or loki datasources, the previous code might have accepted that and partially processed the data
  - Before there may have been partial data with no error, where as no there may be errors but they will have no partial results, just the error.
This commit is contained in:
Kyle Brandt 2023-08-28 08:53:50 -04:00 committed by GitHub
parent 97c3cd1af3
commit 38603b1a9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 550 additions and 160 deletions

View File

@ -5,7 +5,6 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"testing"
@ -74,7 +73,7 @@ func TestIntegrationPrometheus(t *testing.T) {
"datasource": map[string]interface{}{
"uid": uid,
},
"expr": "up",
"expr": "1",
"instantQuery": true,
})
buf1 := &bytes.Buffer{}
@ -88,13 +87,9 @@ func TestIntegrationPrometheus(t *testing.T) {
// nolint:gosec
resp, err := http.Post(u, "application/json", buf1)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
t.Cleanup(func() {
err := resp.Body.Close()
require.NoError(t, err)
_ = resp.Body.Close()
})
_, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NotNil(t, outgoingRequest)
require.Equal(t, "/api/v1/query_range?q1=1&q2=2", outgoingRequest.URL.String())
@ -124,13 +119,9 @@ func TestIntegrationPrometheus(t *testing.T) {
// nolint:gosec
resp, err := http.Post(u, "application/json", buf1)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
t.Cleanup(func() {
err := resp.Body.Close()
require.NoError(t, err)
_ = resp.Body.Close()
})
_, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NotNil(t, outgoingRequest)
require.Equal(t, "/api/v1/query_range", outgoingRequest.URL.Path)

View File

@ -2,7 +2,9 @@ package prometheus
import (
"context"
"io"
"net/http"
"strings"
"testing"
"time"
@ -27,10 +29,19 @@ type healthCheckFailRoundTripper struct {
func (rt *healthCheckSuccessRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return &http.Response{
Status: "200",
StatusCode: 200,
Header: nil,
Body: nil,
Status: "200",
StatusCode: 200,
Header: nil,
Body: io.NopCloser(strings.NewReader(`{
"status": "success",
"data": {
"resultType": "scalar",
"result": [
1692969348.331,
"2"
]
}
}`)),
ContentLength: 0,
Request: req,
}, nil

View File

@ -0,0 +1,60 @@
// package jsonitere wraps json-iterator/go's Iterator methods with error returns
// so linting can catch unchecked errors.
// The underlying iterator's Error property is returned and not reset.
// See json-iterator/go for method documentation and additional methods that
// can be added to this library.
package jsonitere
import j "github.com/json-iterator/go"
type Iterator struct {
// named property instead of embedded so there is no
// confusion about which method or property is called
i *j.Iterator
}
func NewIterator(i *j.Iterator) *Iterator {
return &Iterator{i}
}
func (iter *Iterator) Read() (interface{}, error) {
return iter.i.Read(), iter.i.Error
}
func (iter *Iterator) ReadAny() (j.Any, error) {
return iter.i.ReadAny(), iter.i.Error
}
func (iter *Iterator) ReadArray() (bool, error) {
return iter.i.ReadArray(), iter.i.Error
}
func (iter *Iterator) ReadObject() (string, error) {
return iter.i.ReadObject(), iter.i.Error
}
func (iter *Iterator) ReadString() (string, error) {
return iter.i.ReadString(), iter.i.Error
}
func (iter *Iterator) WhatIsNext() (j.ValueType, error) {
return iter.i.WhatIsNext(), iter.i.Error
}
func (iter *Iterator) Skip() error {
iter.i.Skip()
return iter.i.Error
}
func (iter *Iterator) ReadVal(obj interface{}) error {
iter.i.ReadVal(obj)
return iter.i.Error
}
func (iter *Iterator) ReadFloat64() (float64, error) {
return iter.i.ReadFloat64(), iter.i.Error
}
func (iter *Iterator) ReadInt8() (int8, error) {
return iter.i.ReadInt8(), iter.i.Error
}

View File

@ -8,6 +8,7 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/util/converter/jsonitere"
jsoniter "github.com/json-iterator/go"
)
@ -20,40 +21,70 @@ type Options struct {
Dataplane bool
}
func rspErr(e error) backend.DataResponse {
return backend.DataResponse{Error: e}
}
// ReadPrometheusStyleResult will read results from a prometheus or loki server and return data frames
func ReadPrometheusStyleResult(iter *jsoniter.Iterator, opt Options) backend.DataResponse {
func ReadPrometheusStyleResult(jIter *jsoniter.Iterator, opt Options) backend.DataResponse {
iter := jsonitere.NewIterator(jIter)
var rsp backend.DataResponse
status := "unknown"
errorType := ""
err := ""
promErrString := ""
warnings := []data.Notice{}
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
l1Fields:
for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() {
if err != nil {
return rspErr(err)
}
switch l1Field {
case "status":
status = iter.ReadString()
if status, err = iter.ReadString(); err != nil {
return rspErr(err)
}
case "data":
rsp = readPrometheusData(iter, opt)
if rsp.Error != nil {
return rsp
}
case "error":
err = iter.ReadString()
if promErrString, err = iter.ReadString(); err != nil {
return rspErr(err)
}
case "errorType":
errorType = iter.ReadString()
if errorType, err = iter.ReadString(); err != nil {
return rspErr(err)
}
case "warnings":
warnings = readWarnings(iter)
if warnings, err = readWarnings(iter); err != nil {
return rspErr(err)
}
case "":
if err != nil {
return rspErr(err)
}
break l1Fields
default:
v := iter.Read()
v, err := iter.Read()
if err != nil {
rsp.Error = err
return rsp
}
logf("[ROOT] TODO, support key: %s / %v\n", l1Field, v)
}
}
if status == "error" {
return backend.DataResponse{
Error: fmt.Errorf("%s: %s", errorType, err),
Error: fmt.Errorf("%s: %s", errorType, promErrString),
}
}
@ -69,27 +100,48 @@ func ReadPrometheusStyleResult(iter *jsoniter.Iterator, opt Options) backend.Dat
return rsp
}
func readWarnings(iter *jsoniter.Iterator) []data.Notice {
func readWarnings(iter *jsonitere.Iterator) ([]data.Notice, error) {
warnings := []data.Notice{}
if iter.WhatIsNext() != jsoniter.ArrayValue {
return warnings
next, err := iter.WhatIsNext()
if err != nil {
return nil, err
}
for iter.ReadArray() {
if iter.WhatIsNext() == jsoniter.StringValue {
if next != jsoniter.ArrayValue {
return warnings, nil
}
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return nil, err
}
next, err := iter.WhatIsNext()
if err != nil {
return nil, err
}
if next == jsoniter.StringValue {
s, err := iter.ReadString()
if err != nil {
return nil, err
}
notice := data.Notice{
Severity: data.NoticeSeverityWarning,
Text: iter.ReadString(),
Text: s,
}
warnings = append(warnings, notice)
}
}
return warnings
return warnings, nil
}
func readPrometheusData(iter *jsoniter.Iterator, opt Options) backend.DataResponse {
t := iter.WhatIsNext()
func readPrometheusData(iter *jsonitere.Iterator, opt Options) backend.DataResponse {
var rsp backend.DataResponse
t, err := iter.WhatIsNext()
if err != nil {
return rspErr(err)
}
if t == jsoniter.ArrayValue {
return readArrayData(iter)
}
@ -101,32 +153,54 @@ func readPrometheusData(iter *jsoniter.Iterator, opt Options) backend.DataRespon
}
resultType := ""
var rsp backend.DataResponse
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
l1Fields:
for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() {
if err != nil {
return rspErr(err)
}
switch l1Field {
case "resultType":
resultType = iter.ReadString()
resultType, err = iter.ReadString()
if err != nil {
return rspErr(err)
}
case "result":
switch resultType {
case "matrix", "vector":
rsp = readMatrixOrVectorMulti(iter, resultType, opt)
if rsp.Error != nil {
return rsp
}
case "streams":
rsp = readStream(iter)
if rsp.Error != nil {
return rsp
}
case "string":
rsp = readString(iter)
if rsp.Error != nil {
return rsp
}
case "scalar":
rsp = readScalar(iter)
if rsp.Error != nil {
return rsp
}
default:
iter.Skip()
if err = iter.Skip(); err != nil {
return rspErr(err)
}
rsp = backend.DataResponse{
Error: fmt.Errorf("unknown result type: %s", resultType),
}
}
case "stats":
v := iter.Read()
v, err := iter.Read()
if err != nil {
rspErr(err)
}
if len(rsp.Frames) > 0 {
meta := rsp.Frames[0].Meta
if meta == nil {
@ -138,8 +212,17 @@ func readPrometheusData(iter *jsoniter.Iterator, opt Options) backend.DataRespon
}
}
case "":
if err != nil {
return rspErr(err)
}
break l1Fields
default:
v := iter.Read()
v, err := iter.Read()
if err != nil {
return rspErr(err)
}
logf("[data] TODO, support key: %s / %v\n", l1Field, v)
}
}
@ -148,21 +231,38 @@ func readPrometheusData(iter *jsoniter.Iterator, opt Options) backend.DataRespon
}
// will return strings or exemplars
func readArrayData(iter *jsoniter.Iterator) backend.DataResponse {
func readArrayData(iter *jsonitere.Iterator) backend.DataResponse {
lookup := make(map[string]*data.Field)
var labelFrame *data.Frame
rsp := backend.DataResponse{}
stringField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
stringField.Name = "Value"
for iter.ReadArray() {
switch iter.WhatIsNext() {
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
rspErr(err)
}
next, err := iter.WhatIsNext()
if err != nil {
return rspErr(err)
}
switch next {
case jsoniter.StringValue:
stringField.Append(iter.ReadString())
s, err := iter.ReadString()
if err != nil {
return rspErr(err)
}
stringField.Append(s)
// Either label or exemplars
case jsoniter.ObjectValue:
exemplar, labelPairs := readLabelsOrExemplars(iter)
exemplar, labelPairs, err := readLabelsOrExemplars(iter)
if err != nil {
rspErr(err)
}
if exemplar != nil {
rsp.Frames = append(rsp.Frames, exemplar)
} else if labelPairs != nil {
@ -199,7 +299,10 @@ func readArrayData(iter *jsoniter.Iterator) backend.DataResponse {
default:
{
ext := iter.ReadAny()
ext, err := iter.ReadAny()
if err != nil {
rspErr(err)
}
v := fmt.Sprintf("%v", ext)
stringField.Append(v)
}
@ -214,23 +317,38 @@ func readArrayData(iter *jsoniter.Iterator) backend.DataResponse {
}
// For consistent ordering read values to an array not a map
func readLabelsAsPairs(iter *jsoniter.Iterator) [][2]string {
func readLabelsAsPairs(iter *jsonitere.Iterator) ([][2]string, error) {
pairs := make([][2]string, 0, 10)
for k := iter.ReadObject(); k != ""; k = iter.ReadObject() {
pairs = append(pairs, [2]string{k, iter.ReadString()})
for k, err := iter.ReadObject(); k != ""; k, err = iter.ReadObject() {
if err != nil {
return nil, err
}
v, err := iter.ReadString()
if err != nil {
return nil, err
}
pairs = append(pairs, [2]string{k, v})
}
return pairs
return pairs, nil
}
func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) {
func readLabelsOrExemplars(iter *jsonitere.Iterator) (*data.Frame, [][2]string, error) {
pairs := make([][2]string, 0, 10)
labels := data.Labels{}
var frame *data.Frame
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
l1Fields:
for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() {
if err != nil {
return nil, nil, err
}
switch l1Field {
case "seriesLabels":
iter.ReadVal(&labels)
err = iter.ReadVal(&labels)
if err != nil {
return nil, nil, err
}
case "exemplars":
lookup := make(map[string]*data.Field)
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
@ -242,21 +360,42 @@ func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) {
frame.Meta = &data.FrameMeta{
Custom: resultTypeToCustomMeta("exemplar"),
}
for iter.ReadArray() {
for l2Field := iter.ReadObject(); l2Field != ""; l2Field = iter.ReadObject() {
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return nil, nil, err
}
for l2Field, err := iter.ReadObject(); l2Field != ""; l2Field, err = iter.ReadObject() {
if err != nil {
return nil, nil, err
}
switch l2Field {
// nolint:goconst
case "value":
v, _ := strconv.ParseFloat(iter.ReadString(), 64)
s, err := iter.ReadString()
if err != nil {
return nil, nil, err
}
v, err := strconv.ParseFloat(s, 64)
if err != nil {
return nil, nil, err
}
valueField.Append(v)
case "timestamp":
ts := timeFromFloat(iter.ReadFloat64())
f, err := iter.ReadFloat64()
if err != nil {
return nil, nil, err
}
ts := timeFromFloat(f)
timeField.Append(ts)
case "labels":
max := 0
for _, pair := range readLabelsAsPairs(iter) {
pairs, err := readLabelsAsPairs(iter)
if err != nil {
return nil, nil, err
}
for _, pair := range pairs {
k := pair[0]
v := pair[1]
f, ok := lookup[k]
@ -281,7 +420,10 @@ func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) {
}
default:
iter.Skip()
if err = iter.Skip(); err != nil {
return nil, nil, err
}
frame.AppendNotices(data.Notice{
Severity: data.NoticeSeverityError,
Text: fmt.Sprintf("unable to parse key: %s in response body", l2Field),
@ -289,27 +431,54 @@ func readLabelsOrExemplars(iter *jsoniter.Iterator) (*data.Frame, [][2]string) {
}
}
}
case "":
if err != nil {
return nil, nil, err
}
break l1Fields
default:
v := fmt.Sprintf("%v", iter.Read())
iV, err := iter.Read()
if err != nil {
return nil, nil, err
}
v := fmt.Sprintf("%v", iV)
pairs = append(pairs, [2]string{l1Field, v})
}
}
return frame, pairs
return frame, pairs, nil
}
func readString(iter *jsoniter.Iterator) backend.DataResponse {
func readString(iter *jsonitere.Iterator) backend.DataResponse {
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeField.Name = data.TimeSeriesTimeFieldName
valueField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
valueField.Name = data.TimeSeriesValueFieldName
valueField.Labels = data.Labels{}
iter.ReadArray()
t := iter.ReadFloat64()
iter.ReadArray()
v := iter.ReadString()
iter.ReadArray()
_, err := iter.ReadArray()
if err != nil {
return rspErr(err)
}
var t float64
if t, err = iter.ReadFloat64(); err != nil {
return rspErr(err)
}
if _, err = iter.ReadArray(); err != nil {
return rspErr(err)
}
var v string
if v, err = iter.ReadString(); err != nil {
return rspErr(err)
}
if _, err = iter.ReadArray(); err != nil {
return rspErr(err)
}
tt := timeFromFloat(t)
timeField.Append(tt)
@ -326,7 +495,9 @@ func readString(iter *jsoniter.Iterator) backend.DataResponse {
}
}
func readScalar(iter *jsoniter.Iterator) backend.DataResponse {
func readScalar(iter *jsonitere.Iterator) backend.DataResponse {
rsp := backend.DataResponse{}
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeField.Name = data.TimeSeriesTimeFieldName
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0)
@ -334,10 +505,12 @@ func readScalar(iter *jsoniter.Iterator) backend.DataResponse {
valueField.Labels = data.Labels{}
t, v, err := readTimeValuePair(iter)
if err == nil {
timeField.Append(t)
valueField.Append(v)
if err != nil {
rsp.Error = err
return rsp
}
timeField.Append(t)
valueField.Append(v)
frame := data.NewFrame("", timeField, valueField)
frame.Meta = &data.FrameMeta{
@ -350,10 +523,13 @@ func readScalar(iter *jsoniter.Iterator) backend.DataResponse {
}
}
func readMatrixOrVectorMulti(iter *jsoniter.Iterator, resultType string, opt Options) backend.DataResponse {
func readMatrixOrVectorMulti(iter *jsonitere.Iterator, resultType string, opt Options) backend.DataResponse {
rsp := backend.DataResponse{}
for iter.ReadArray() {
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return rspErr(err)
}
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeField.Name = data.TimeSeriesTimeFieldName
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, 0)
@ -362,50 +538,64 @@ func readMatrixOrVectorMulti(iter *jsoniter.Iterator, resultType string, opt Opt
var histogram *histogramInfo
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() {
if err != nil {
return rspErr(err)
}
switch l1Field {
case "metric":
iter.ReadVal(&valueField.Labels)
if err = iter.ReadVal(&valueField.Labels); err != nil {
return rspErr(err)
}
case "value":
t, v, err := readTimeValuePair(iter)
if err == nil {
timeField.Append(t)
valueField.Append(v)
if err != nil {
return rspErr(err)
}
timeField.Append(t)
valueField.Append(v)
// nolint:goconst
case "values":
for iter.ReadArray() {
t, v, err := readTimeValuePair(iter)
if err == nil {
timeField.Append(t)
valueField.Append(v)
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return rspErr(err)
}
t, v, err := readTimeValuePair(iter)
if err != nil {
return rspErr(err)
}
timeField.Append(t)
valueField.Append(v)
}
case "histogram":
if histogram == nil {
histogram = newHistogramInfo()
}
err := readHistogram(iter, histogram)
err = readHistogram(iter, histogram)
if err != nil {
rsp.Error = err
return rspErr(err)
}
case "histograms":
if histogram == nil {
histogram = newHistogramInfo()
}
for iter.ReadArray() {
err := readHistogram(iter, histogram)
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
rsp.Error = err
return rspErr(err)
}
if err = readHistogram(iter, histogram); err != nil {
return rspErr(err)
}
}
default:
iter.Skip()
if err = iter.Skip(); err != nil {
return rspErr(err)
}
logf("readMatrixOrVector: %s\n", l1Field)
}
}
@ -439,12 +629,28 @@ func readMatrixOrVectorMulti(iter *jsoniter.Iterator, resultType string, opt Opt
return rsp
}
func readTimeValuePair(iter *jsoniter.Iterator) (time.Time, float64, error) {
iter.ReadArray()
t := iter.ReadFloat64()
iter.ReadArray()
v := iter.ReadString()
iter.ReadArray()
func readTimeValuePair(iter *jsonitere.Iterator) (time.Time, float64, error) {
if _, err := iter.ReadArray(); err != nil {
return time.Time{}, 0, err
}
t, err := iter.ReadFloat64()
if err != nil {
return time.Time{}, 0, err
}
if _, err = iter.ReadArray(); err != nil {
return time.Time{}, 0, err
}
var v string
if v, err = iter.ReadString(); err != nil {
return time.Time{}, 0, err
}
if _, err = iter.ReadArray(); err != nil {
return time.Time{}, 0, err
}
tt := timeFromFloat(t)
fv, err := strconv.ParseFloat(v, 64)
@ -478,75 +684,123 @@ func newHistogramInfo() *histogramInfo {
// This will read a single sparse histogram
// [ time, { count, sum, buckets: [...] }]
func readHistogram(iter *jsoniter.Iterator, hist *histogramInfo) error {
func readHistogram(iter *jsonitere.Iterator, hist *histogramInfo) error {
// first element
iter.ReadArray()
t := timeFromFloat(iter.ReadFloat64())
if _, err := iter.ReadArray(); err != nil {
return err
}
var err error
f, err := iter.ReadFloat64()
if err != nil {
return err
}
t := timeFromFloat(f)
// next object element
iter.ReadArray()
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
if _, err := iter.ReadArray(); err != nil {
return err
}
for l1Field, err := iter.ReadObject(); l1Field != ""; l1Field, err = iter.ReadObject() {
if err != nil {
return err
}
switch l1Field {
case "count":
iter.Skip()
if err = iter.Skip(); err != nil {
return err
}
case "sum":
iter.Skip()
if err = iter.Skip(); err != nil {
return err
}
case "buckets":
for iter.ReadArray() {
hist.time.Append(t)
iter.ReadArray()
hist.yLayout.Append(iter.ReadInt8())
iter.ReadArray()
err = appendValueFromString(iter, hist.yMin)
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return err
}
hist.time.Append(t)
if _, err := iter.ReadArray(); err != nil {
return err
}
v, err := iter.ReadInt8()
if err != nil {
return err
}
hist.yLayout.Append(v)
if _, err := iter.ReadArray(); err != nil {
return err
}
if err = appendValueFromString(iter, hist.yMin); err != nil {
return err
}
if _, err := iter.ReadArray(); err != nil {
return err
}
iter.ReadArray()
err = appendValueFromString(iter, hist.yMax)
if err != nil {
return err
}
iter.ReadArray()
if _, err := iter.ReadArray(); err != nil {
return err
}
err = appendValueFromString(iter, hist.count)
if err != nil {
return err
}
if iter.ReadArray() {
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
return err
}
return fmt.Errorf("expected close array")
}
}
default:
iter.Skip()
if err = iter.Skip(); err != nil {
return err
}
logf("[SKIP]readHistogram: %s\n", l1Field)
}
}
if iter.ReadArray() {
if more, err := iter.ReadArray(); more || err != nil {
if err != nil {
return err
}
return fmt.Errorf("expected to be done")
}
return nil
}
func appendValueFromString(iter *jsoniter.Iterator, field *data.Field) error {
v, err := strconv.ParseFloat(iter.ReadString(), 64)
if err != nil {
func appendValueFromString(iter *jsonitere.Iterator, field *data.Field) error {
var err error
var s string
if s, err = iter.ReadString(); err != nil {
return err
}
var v float64
if v, err = strconv.ParseFloat(s, 64); err != nil {
return err
}
field.Append(v)
return nil
}
func readStream(iter *jsoniter.Iterator) backend.DataResponse {
func readStream(iter *jsonitere.Iterator) backend.DataResponse {
rsp := backend.DataResponse{}
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
@ -568,34 +822,73 @@ func readStream(iter *jsoniter.Iterator) backend.DataResponse {
return backend.DataResponse{Error: err}
}
for iter.ReadArray() {
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() {
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
rspErr(err)
}
l1Fields:
for l1Field, err := iter.ReadObject(); ; l1Field, err = iter.ReadObject() {
if err != nil {
return rspErr(err)
}
switch l1Field {
case "stream":
// we need to clear `labels`, because `iter.ReadVal`
// only appends to it
labels := data.Labels{}
iter.ReadVal(&labels)
labelJson, err = labelsToRawJson(labels)
if err != nil {
return backend.DataResponse{Error: err}
if err = iter.ReadVal(&labels); err != nil {
return rspErr(err)
}
if labelJson, err = labelsToRawJson(labels); err != nil {
return rspErr(err)
}
case "values":
for iter.ReadArray() {
iter.ReadArray()
ts := iter.ReadString()
iter.ReadArray()
line := iter.ReadString()
iter.ReadArray()
for more, err := iter.ReadArray(); more; more, err = iter.ReadArray() {
if err != nil {
rsp.Error = err
return rsp
}
t := timeFromLokiString(ts)
if _, err = iter.ReadArray(); err != nil {
return rspErr(err)
}
ts, err := iter.ReadString()
if err != nil {
return rspErr(err)
}
if _, err = iter.ReadArray(); err != nil {
return rspErr(err)
}
line, err := iter.ReadString()
if err != nil {
return rspErr(err)
}
if _, err = iter.ReadArray(); err != nil {
return rspErr(err)
}
t, err := timeFromLokiString(ts)
if err != nil {
return rspErr(err)
}
labelsField.Append(labelJson)
timeField.Append(t)
lineField.Append(line)
tsField.Append(ts)
}
case "":
if err != nil {
return rspErr(err)
}
break l1Fields
}
}
}
@ -615,7 +908,7 @@ func timeFromFloat(fv float64) time.Time {
return time.UnixMilli(int64(fv * 1000.0)).UTC()
}
func timeFromLokiString(str string) time.Time {
func timeFromLokiString(str string) (time.Time, error) {
// normal time values look like: 1645030246277587968
// and are less than: math.MaxInt65=9223372036854775807
// This will do a fast path for any date before 2033
@ -623,13 +916,17 @@ func timeFromLokiString(str string) time.Time {
if s < 19 || (s == 19 && str[0] == '1') {
ns, err := strconv.ParseInt(str, 10, 64)
if err == nil {
return time.Unix(0, ns).UTC()
return time.Unix(0, ns).UTC(), nil
}
}
if s < 10 {
return time.Time{}, fmt.Errorf("unexpected time format '%v' in response. response may have been truncated", str)
}
ss, _ := strconv.ParseInt(str[0:10], 10, 64)
ns, _ := strconv.ParseInt(str[10:], 10, 64)
return time.Unix(ss, ns).UTC()
return time.Unix(ss, ns).UTC(), nil
}
func labelsToRawJson(labels data.Labels) (json.RawMessage, error) {

View File

@ -1,6 +1,7 @@
package converter
import (
"fmt"
"os"
"path"
"strings"
@ -8,39 +9,63 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/grafana/grafana/pkg/infra/httpclient"
jsoniter "github.com/json-iterator/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const update = true
const update = false
var files = []string{
"prom-labels",
"prom-matrix",
"prom-matrix-with-nans",
"prom-matrix-histogram-no-labels",
"prom-matrix-histogram-partitioned",
"prom-vector-histogram-no-labels",
"prom-vector",
"prom-string",
"prom-scalar",
"prom-series",
"prom-warnings",
"prom-error",
"prom-exemplars-a",
"prom-exemplars-b",
"loki-streams-a",
"loki-streams-b",
"loki-streams-c",
}
func TestReadPromFrames(t *testing.T) {
files := []string{
"prom-labels",
"prom-matrix",
"prom-matrix-with-nans",
"prom-matrix-histogram-no-labels",
"prom-matrix-histogram-partitioned",
"prom-vector-histogram-no-labels",
"prom-vector",
"prom-string",
"prom-scalar",
"prom-series",
"prom-warnings",
"prom-error",
"prom-exemplars-a",
"prom-exemplars-b",
"loki-streams-a",
"loki-streams-b",
"loki-streams-c",
}
for _, name := range files {
t.Run(name, runScenario(name, Options{}))
}
}
func TestReadLimited(t *testing.T) {
for _, name := range files {
p := path.Join("testdata", name+".json")
stat, err := os.Stat(p)
require.NoError(t, err)
size := stat.Size()
for i := int64(10); i < size-1; i += size / 10 {
t.Run(fmt.Sprintf("%v_%v", name, i), func(t *testing.T) {
//nolint:gosec
f, err := os.Open(p)
require.NoError(t, err)
mbr := httpclient.MaxBytesReader(f, i)
iter := jsoniter.Parse(jsoniter.ConfigDefault, mbr, 1024)
rsp := ReadPrometheusStyleResult(iter, Options{})
require.ErrorContains(t, rsp.Error, "response body too large")
})
}
}
}
// FIXME:
//
//lint:ignore U1000 Ignore used function for now
@ -58,6 +83,7 @@ func runScenario(name string, opts Options) func(t *testing.T) {
require.Error(t, rsp.Error)
return
}
require.NoError(t, rsp.Error)
fname := name + "-frame"
experimental.CheckGoldenJSONResponse(t, "testdata", fname, &rsp, update)
@ -70,12 +96,17 @@ func TestTimeConversions(t *testing.T) {
time.Date(2020, time.September, 14, 15, 22, 25, 479000000, time.UTC),
timeFromFloat(1600096945.479))
ti, err := timeFromLokiString("1645030246277587968")
require.NoError(t, err)
// Loki date parsing
assert.Equal(t,
time.Date(2022, time.February, 16, 16, 50, 46, 277587968, time.UTC),
timeFromLokiString("1645030246277587968"))
ti)
ti, err = timeFromLokiString("2000000000000000000")
require.NoError(t, err)
assert.Equal(t,
time.Date(2033, time.May, 18, 3, 33, 20, 0, time.UTC),
timeFromLokiString("2000000000000000000"))
ti)
}