Prometheus: Add flag to compare streaming and buffered responses (#51711)

* Add flag to compare streaming and buffered

* Move log

* Fix lint

* Fix tests

* Remove debug logs
This commit is contained in:
Andrej Ocenas 2022-07-28 14:26:51 +02:00 committed by GitHub
parent 2f9636e698
commit d0e548c3e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 120 additions and 36 deletions

View File

@ -50,6 +50,7 @@ export interface FeatureToggles {
datasourceQueryMultiStatus?: boolean;
traceToMetrics?: boolean;
prometheusStreamingJSONParser?: boolean;
prometheusStreamingJSONParserTest?: boolean;
validateDashboardsOnSave?: boolean;
autoMigrateGraphPanels?: boolean;
prometheusWideSeries?: boolean;

View File

@ -191,6 +191,11 @@ var (
Description: "Enable streaming JSON parser for Prometheus datasource",
State: FeatureStateBeta,
},
{
Name: "prometheusStreamingJSONParserTest",
Description: "Run both old and streaming requests and log differences",
State: FeatureStateBeta,
},
{
Name: "validateDashboardsOnSave",
Description: "Validate dashboard JSON POSTed to api/dashboards/db",

View File

@ -143,6 +143,10 @@ const (
// Enable streaming JSON parser for Prometheus datasource
FlagPrometheusStreamingJSONParser = "prometheusStreamingJSONParser"
// FlagPrometheusStreamingJSONParserTest
// Run both old and streaming requests and log differences
FlagPrometheusStreamingJSONParserTest = "prometheusStreamingJSONParserTest"
// FlagValidateDashboardsOnSave
// Validate dashboard JSON POSTed to api/dashboards/db
FlagValidateDashboardsOnSave = "validateDashboardsOnSave"

View File

@ -362,14 +362,14 @@ func matrixToDataFrames(matrix model.Matrix, query *PrometheusQuery, frames data
tags[string(k)] = string(v)
}
timeField := data.NewFieldFromFieldType(data.FieldTypeTime, len(v.Values))
valueField := data.NewFieldFromFieldType(data.FieldTypeNullableFloat64, len(v.Values))
valueField := data.NewFieldFromFieldType(data.FieldTypeFloat64, len(v.Values))
for i, k := range v.Values {
timeField.Set(i, k.Timestamp.Time().UTC())
value := float64(k.Value)
if !math.IsNaN(value) {
valueField.Set(i, &value)
valueField.Set(i, value)
}
}

View File

@ -723,8 +723,8 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
require.Equal(t, time.Unix(1, 0).UTC(), res[0].Fields[0].At(0))
require.Equal(t, time.Unix(4, 0).UTC(), res[0].Fields[0].At(1))
require.Equal(t, res[0].Fields[1].Len(), 2)
require.Equal(t, float64(1), *res[0].Fields[1].At(0).(*float64))
require.Equal(t, float64(4), *res[0].Fields[1].At(1).(*float64))
require.Equal(t, float64(1), res[0].Fields[1].At(0).(float64))
require.Equal(t, float64(4), res[0].Fields[1].At(1).(float64))
})
t.Run("matrix response with from alerting missed data points should be parsed correctly", func(t *testing.T) {
@ -780,9 +780,8 @@ func TestPrometheus_parseTimeSeriesResponse(t *testing.T) {
res, err := parseTimeSeriesResponse(value, query)
require.NoError(t, err)
var nilPointer *float64
require.Equal(t, res[0].Fields[1].Name, "Value")
require.Equal(t, res[0].Fields[1].At(0), nilPointer)
require.Equal(t, "Value", res[0].Fields[1].Name)
require.Equal(t, float64(0), res[0].Fields[1].At(0))
})
t.Run("vector response should be parsed normally", func(t *testing.T) {

View File

@ -2,8 +2,11 @@ package prometheus
import (
"context"
"encoding/json"
"errors"
"fmt"
"reflect"
"sync"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
@ -17,6 +20,8 @@ import (
"github.com/grafana/grafana/pkg/tsdb/prometheus/querydata"
"github.com/grafana/grafana/pkg/tsdb/prometheus/resource"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/yudai/gojsondiff"
"github.com/yudai/gojsondiff/formatter"
)
var plog = log.New("tsdb.prometheus")
@ -91,6 +96,36 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
return i.queryData.Execute(ctx, req)
}
// To test the new client implementation this can be run and we do 2 requests and compare.
if s.features.IsEnabled(featuremgmt.FlagPrometheusStreamingJSONParserTest) {
var wg sync.WaitGroup
var streamData *backend.QueryDataResponse
var streamError error
var data *backend.QueryDataResponse
var err error
plog.Debug("PrometheusStreamingJSONParserTest", "req", req)
wg.Add(1)
go func() {
defer wg.Done()
streamData, streamError = i.queryData.Execute(ctx, req)
}()
wg.Add(1)
go func() {
defer wg.Done()
data, err = i.buffered.ExecuteTimeSeriesQuery(ctx, req)
}()
wg.Wait()
// Report can take a while and we don't really need to wait for it.
go reportDiff(data, err, streamData, streamError)
return data, err
}
return i.buffered.ExecuteTimeSeriesQuery(ctx, req)
}
@ -131,3 +166,48 @@ func ConvertAPIError(err error) error {
}
return err
}
func reportDiff(data *backend.QueryDataResponse, err error, streamData *backend.QueryDataResponse, streamError error) {
if err == nil && streamError != nil {
plog.Debug("PrometheusStreamingJSONParserTest error in streaming client", "err", streamError)
}
if err != nil && streamError == nil {
plog.Debug("PrometheusStreamingJSONParserTest error in buffer but not streaming", "err", err)
}
if !reflect.DeepEqual(data, streamData) {
plog.Debug("PrometheusStreamingJSONParserTest buffer and streaming data are different")
dataJson, jsonErr := json.MarshalIndent(data, "", "\t")
if jsonErr != nil {
plog.Debug("PrometheusStreamingJSONParserTest error marshaling data", "jsonErr", jsonErr)
}
streamingJson, jsonErr := json.MarshalIndent(streamData, "", "\t")
if jsonErr != nil {
plog.Debug("PrometheusStreamingJSONParserTest error marshaling streaming data", "jsonErr", jsonErr)
}
differ := gojsondiff.New()
d, diffErr := differ.Compare(dataJson, streamingJson)
if diffErr != nil {
plog.Debug("PrometheusStreamingJSONParserTest diff error", "err", diffErr)
}
config := formatter.AsciiFormatterConfig{
ShowArrayIndex: true,
Coloring: true,
}
var aJson map[string]interface{}
unmarshallErr := json.Unmarshal(dataJson, &aJson)
if unmarshallErr != nil {
plog.Debug("PrometheusStreamingJSONParserTest unmarshall error", "err", unmarshallErr)
}
formatter := formatter.NewAsciiFormatter(aJson, config)
diffString, diffErr := formatter.Format(d)
if diffErr != nil {
plog.Debug("PrometheusStreamingJSONParserTest diff format error", "err", diffErr)
}
fmt.Println(diffString)
} else {
plog.Debug("PrometheusStreamingJSONParserTest responses are the same")
}
}

View File

@ -9,15 +9,15 @@
// }
// Name: 1 / 0
// Dimensions: 2 Fields by 3 Rows
// +-------------------------------+------------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: |
// | Type: []time.Time | Type: []*float64 |
// +-------------------------------+------------------+
// | 2022-01-11 08:25:30 +0000 UTC | +Inf |
// | 2022-01-11 08:25:31 +0000 UTC | +Inf |
// | 2022-01-11 08:25:32 +0000 UTC | +Inf |
// +-------------------------------+------------------+
// +-------------------------------+-----------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: |
// | Type: []time.Time | Type: []float64 |
// +-------------------------------+-----------------+
// | 2022-01-11 08:25:30 +0000 UTC | +Inf |
// | 2022-01-11 08:25:31 +0000 UTC | +Inf |
// | 2022-01-11 08:25:32 +0000 UTC | +Inf |
// +-------------------------------+-----------------+
//
//
// 🌟 This was machine generated. Do not edit. 🌟
@ -48,8 +48,7 @@
"name": "Value",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
"frame": "float64"
},
"labels": {},
"config": {

View File

@ -12,7 +12,7 @@
// +-------------------------------+------------------------------------------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: __name__=go_goroutines, job=prometheus |
// | Type: []time.Time | Type: []*float64 |
// | Type: []time.Time | Type: []float64 |
// +-------------------------------+------------------------------------------------+
// | 2022-01-11 08:25:33 +0000 UTC | 21 |
// | 2022-01-11 08:25:34 +0000 UTC | 32 |
@ -48,8 +48,7 @@
"name": "Value",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
"frame": "float64"
},
"labels": {
"__name__": "go_goroutines",

View File

@ -12,11 +12,11 @@
// +-------------------------------+-----------------------------------------------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: handler=/api/v1/query_range, job=prometheus |
// | Type: []time.Time | Type: []*float64 |
// | Type: []time.Time | Type: []float64 |
// +-------------------------------+-----------------------------------------------------+
// | 2022-01-11 08:25:30 +0000 UTC | null |
// | 2022-01-11 08:25:31 +0000 UTC | null |
// | 2022-01-11 08:25:32 +0000 UTC | null |
// | 2022-01-11 08:25:30 +0000 UTC | 0 |
// | 2022-01-11 08:25:31 +0000 UTC | 0 |
// | 2022-01-11 08:25:32 +0000 UTC | 0 |
// +-------------------------------+-----------------------------------------------------+
//
//
@ -48,8 +48,7 @@
"name": "Value",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
"frame": "float64"
},
"labels": {
"handler": "/api/v1/query_range",
@ -69,9 +68,9 @@
1641889532000
],
[
null,
null,
null
0,
0,
0
]
]
}

View File

@ -12,7 +12,7 @@
// +-----------------------------------+--------------------------------------------------------------------------------------------------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: __name__=prometheus_http_requests_total, code=200, handler=/api/v1/query_range, job=prometheus |
// | Type: []time.Time | Type: []*float64 |
// | Type: []time.Time | Type: []float64 |
// +-----------------------------------+--------------------------------------------------------------------------------------------------------+
// | 2022-01-11 08:25:30.123 +0000 UTC | 21 |
// | 2022-01-11 08:25:31.123 +0000 UTC | 32 |
@ -33,7 +33,7 @@
// +-----------------------------------+--------------------------------------------------------------------------------------------------------+
// | Name: Time | Name: Value |
// | Labels: | Labels: __name__=prometheus_http_requests_total, code=400, handler=/api/v1/query_range, job=prometheus |
// | Type: []time.Time | Type: []*float64 |
// | Type: []time.Time | Type: []float64 |
// +-----------------------------------+--------------------------------------------------------------------------------------------------------+
// | 2022-01-11 08:25:29.123 +0000 UTC | 54 |
// | 2022-01-11 08:25:32.123 +0000 UTC | 76 |
@ -68,8 +68,7 @@
"name": "Value",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
"frame": "float64"
},
"labels": {
"__name__": "prometheus_http_requests_total",
@ -123,8 +122,7 @@
"name": "Value",
"type": "number",
"typeInfo": {
"frame": "float64",
"nullable": true
"frame": "float64"
},
"labels": {
"__name__": "prometheus_http_requests_total",