Loki: Convert Loki query response to dataframes (#32316)

* Return dataFrames instead of series

* Add test for parseResponse

* Update pkg/tsdb/loki/loki.go

Co-authored-by: Dimitris Sotirakis <sotirakis.dim@gmail.com>

* Fix linting, add test

Co-authored-by: Dimitris Sotirakis <sotirakis.dim@gmail.com>
This commit is contained in:
Ivana Huckova 2021-03-26 16:47:46 +01:00 committed by GitHub
parent ccb563d572
commit 5ce9fa360d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 73 additions and 13 deletions

View File

@ -7,7 +7,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/grafana/grafana/pkg/components/null" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/log" "github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins"
@ -146,32 +146,34 @@ func (e *LokiExecutor) parseQuery(dsInfo *models.DataSource, queryContext plugin
func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (plugins.DataQueryResult, error) { func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (plugins.DataQueryResult, error) {
var queryRes plugins.DataQueryResult var queryRes plugins.DataQueryResult
frames := data.Frames{}
//We are currently processing only matrix results (for alerting) //We are currently processing only matrix results (for alerting)
data, ok := value.Data.Result.(loghttp.Matrix) matrix, ok := value.Data.Result.(loghttp.Matrix)
if !ok { if !ok {
return queryRes, fmt.Errorf("unsupported result format: %q", value.Data.ResultType) return queryRes, fmt.Errorf("unsupported result format: %q", value.Data.ResultType)
} }
for _, v := range data { for _, v := range matrix {
series := plugins.DataTimeSeries{ name := formatLegend(v.Metric, query)
Name: formatLegend(v.Metric, query), tags := make(map[string]string, len(v.Metric))
Tags: make(map[string]string, len(v.Metric)), timeVector := make([]time.Time, 0, len(v.Values))
Points: make([]plugins.DataTimePoint, 0, len(v.Values)), values := make([]float64, 0, len(v.Values))
}
for k, v := range v.Metric { for k, v := range v.Metric {
series.Tags[string(k)] = string(v) tags[string(k)] = string(v)
} }
for _, k := range v.Values { for _, k := range v.Values {
series.Points = append(series.Points, plugins.DataTimePoint{ timeVector = append(timeVector, time.Unix(k.Timestamp.Unix(), 0).UTC())
null.FloatFrom(float64(k.Value)), null.FloatFrom(float64(k.Timestamp.Unix() * 1000)), values = append(values, float64(k.Value))
})
} }
queryRes.Series = append(queryRes.Series, series) frames = append(frames, data.NewFrame(name,
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})))
} }
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
return queryRes, nil return queryRes, nil
} }

View File

@ -7,6 +7,7 @@ import (
"github.com/grafana/grafana/pkg/components/simplejson" "github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models" "github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/loki/pkg/loghttp"
p "github.com/prometheus/common/model" p "github.com/prometheus/common/model"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -94,3 +95,60 @@ func TestLoki(t *testing.T) {
require.Equal(t, time.Second*2, models[0].Step) require.Equal(t, time.Second*2, models[0].Step)
}) })
} }
func TestParseResponse(t *testing.T) {
t.Run("value is not of type matrix", func(t *testing.T) {
queryRes := plugins.DataQueryResult{}
value := loghttp.QueryResponse{
Data: loghttp.QueryResponseData{
Result: loghttp.Vector{},
},
}
res, err := parseResponse(&value, nil)
require.Equal(t, queryRes, res)
require.Error(t, err)
})
t.Run("response should be parsed normally", func(t *testing.T) {
values := []p.SamplePair{
{Value: 1, Timestamp: 1000},
{Value: 2, Timestamp: 2000},
{Value: 3, Timestamp: 3000},
{Value: 4, Timestamp: 4000},
{Value: 5, Timestamp: 5000},
}
value := loghttp.QueryResponse{
Data: loghttp.QueryResponseData{
Result: loghttp.Matrix{
p.SampleStream{
Metric: p.Metric{"app": "Application", "tag2": "tag2"},
Values: values,
},
},
},
}
query := &lokiQuery{
LegendFormat: "legend {{app}}",
}
res, err := parseResponse(&value, query)
require.NoError(t, err)
decoded, _ := res.Dataframes.Decoded()
require.Len(t, decoded, 1)
require.Equal(t, decoded[0].Name, "legend Application")
require.Len(t, decoded[0].Fields, 2)
require.Len(t, decoded[0].Fields[0].Labels, 0)
require.Equal(t, decoded[0].Fields[0].Name, "time")
require.Len(t, decoded[0].Fields[1].Labels, 2)
require.Equal(t, decoded[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
require.Equal(t, decoded[0].Fields[1].Name, "value")
require.Equal(t, decoded[0].Fields[1].Config.DisplayNameFromDS, "legend Application")
// Ensure the timestamps are UTC zoned
testValue := decoded[0].Fields[0].At(0)
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
})
}