Prometheus: Convert Prometheus query response to dataframes (#31948)

* Convert Prometheus query response to dataframes

* Add parseResponse tests

* Fixed lint imports

* Force use UTC as timezone in timeVector

* Updated test to check timezone
This commit is contained in:
Dimitris Sotirakis 2021-03-18 08:40:26 +02:00 committed by GitHub
parent 5bb1b1602d
commit 2d72b4e69e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 61 additions and 15 deletions

View File

@ -12,7 +12,7 @@ import (
"net/http"
"github.com/grafana/grafana/pkg/components/null"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
@ -193,32 +193,32 @@ func (e *PrometheusExecutor) parseQuery(dsInfo *models.DataSource, query plugins
func parseResponse(value model.Value, query *PrometheusQuery) (plugins.DataQueryResult, error) {
var queryRes plugins.DataQueryResult
frames := data.Frames{}
data, ok := value.(model.Matrix)
matrix, ok := value.(model.Matrix)
if !ok {
return queryRes, fmt.Errorf("unsupported result format: %q", value.Type().String())
}
for _, v := range data {
series := plugins.DataTimeSeries{
Name: formatLegend(v.Metric, query),
Tags: make(map[string]string, len(v.Metric)),
Points: make([]plugins.DataTimePoint, 0, len(v.Values)),
}
for _, v := range matrix {
name := formatLegend(v.Metric, query)
tags := make(map[string]string, len(v.Metric))
timeVector := make([]time.Time, 0, len(v.Values))
values := make([]float64, 0, len(v.Values))
for k, v := range v.Metric {
series.Tags[string(k)] = string(v)
tags[string(k)] = string(v)
}
for _, k := range v.Values {
series.Points = append(series.Points, plugins.DataTimePoint{
null.FloatFrom(float64(k.Value)),
null.FloatFrom(float64(k.Timestamp.Unix() * 1000)),
})
timeVector = append(timeVector, time.Unix(k.Timestamp.Unix(), 0).UTC())
values = append(values, float64(k.Value))
}
queryRes.Series = append(queryRes.Series, series)
frames = append(frames, data.NewFrame(name,
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values)))
}
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
return queryRes, nil
}

View File

@ -142,3 +142,49 @@ func TestPrometheus(t *testing.T) {
require.Equal(t, time.Minute*2, models[0].Step)
})
}
func TestParseResponse(t *testing.T) {
t.Run("value is not of type matrix", func(t *testing.T) {
queryRes := plugins.DataQueryResult{}
value := p.Vector{}
res, err := parseResponse(value, nil)
require.Equal(t, queryRes, res)
require.Error(t, err)
})
t.Run("response should be parsed normally", func(t *testing.T) {
values := []p.SamplePair{
{Value: 1, Timestamp: 1000},
{Value: 2, Timestamp: 2000},
{Value: 3, Timestamp: 3000},
{Value: 4, Timestamp: 4000},
{Value: 5, Timestamp: 5000},
}
value := p.Matrix{
&p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
},
}
query := &PrometheusQuery{
LegendFormat: "legend {{app}}",
}
res, err := parseResponse(value, query)
require.NoError(t, err)
decoded, _ := res.Dataframes.Decoded()
require.Len(t, decoded, 1)
require.Equal(t, decoded[0].Name, "legend Application")
require.Len(t, decoded[0].Fields, 2)
require.Len(t, decoded[0].Fields[0].Labels, 0)
require.Equal(t, decoded[0].Fields[0].Name, "time")
require.Len(t, decoded[0].Fields[1].Labels, 2)
require.Equal(t, decoded[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
require.Equal(t, decoded[0].Fields[1].Name, "value")
// Ensure the timestamps are UTC zoned
testValue := decoded[0].Fields[0].At(0)
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
})
}