Alerting: InfluxDB: InfluxQL: convert backend-code to use dataframes (#32950)

* alerting: influxdb: refactor unit-tests

* alerting: influxdb: converted code from timeseries-mode to dataframe-mode

* influxdb: simplify code

* influxdb: better function name

* influxdb: alerting: more tests

* influxdb: alerting: more tests

* influxdb: refactor

* influxdb: improved unit-test

* influxdb: simplified code

* influxdb: refactor reponse-parser code

* influxdb: refactor unit tests

* influxdb: alerting: use nicer names

Co-authored-by: dsotirakis <sotirakis.dim@gmail.com>
This commit is contained in:
Gábor Farkas 2021-04-22 08:43:17 +02:00 committed by GitHub
parent 8ea1470893
commit 7ff6665ac2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 394 additions and 161 deletions

View File

@ -2,7 +2,6 @@ package influxdb
import ( import (
"context" "context"
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"net/http" "net/http"
@ -91,19 +90,9 @@ func (e *Executor) DataQuery(ctx context.Context, dsInfo *models.DataSource, tsd
return plugins.DataResponse{}, fmt.Errorf("InfluxDB returned error status: %s", resp.Status) return plugins.DataResponse{}, fmt.Errorf("InfluxDB returned error status: %s", resp.Status)
} }
var response Response
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
if err := dec.Decode(&response); err != nil {
return plugins.DataResponse{}, err
}
if response.Err != nil {
return plugins.DataResponse{}, response.Err
}
result := plugins.DataResponse{ result := plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{ Results: map[string]plugins.DataQueryResult{
"A": e.ResponseParser.Parse(&response, query), "A": e.ResponseParser.Parse(resp.Body, query),
}, },
} }

View File

@ -27,13 +27,13 @@ type Select []QueryPart
type Response struct { type Response struct {
Results []Result Results []Result
Err error Error string
} }
type Result struct { type Result struct {
Series []Row Series []Row
Messages []*Message Messages []*Message
Err error Error string
} }
type Message struct { type Message struct {

View File

@ -3,11 +3,13 @@ package influxdb
import ( import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/grafana/grafana/pkg/components/null" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/plugins" "github.com/grafana/grafana/pkg/plugins"
) )
@ -21,48 +23,79 @@ func init() {
legendFormat = regexp.MustCompile(`\[\[([\@\/\w-]+)(\.[\@\/\w-]+)*\]\]*|\$\s*([\@\/\w-]+?)*`) legendFormat = regexp.MustCompile(`\[\[([\@\/\w-]+)(\.[\@\/\w-]+)*\]\]*|\$\s*([\@\/\w-]+?)*`)
} }
func (rp *ResponseParser) Parse(response *Response, query *Query) plugins.DataQueryResult { func (rp *ResponseParser) Parse(buf io.ReadCloser, query *Query) plugins.DataQueryResult {
var queryRes plugins.DataQueryResult var queryRes plugins.DataQueryResult
response, jsonErr := parseJSON(buf)
if jsonErr != nil {
queryRes.Error = jsonErr
return queryRes
}
if response.Error != "" {
queryRes.Error = fmt.Errorf(response.Error)
return queryRes
}
frames := data.Frames{}
for _, result := range response.Results { for _, result := range response.Results {
queryRes.Series = append(queryRes.Series, rp.transformRows(result.Series, queryRes, query)...) frames = append(frames, transformRows(result.Series, query)...)
if result.Err != nil { if result.Error != "" {
queryRes.Error = result.Err queryRes.Error = fmt.Errorf(result.Error)
} }
} }
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
return queryRes return queryRes
} }
func (rp *ResponseParser) transformRows(rows []Row, queryResult plugins.DataQueryResult, query *Query) plugins.DataTimeSeriesSlice { func parseJSON(buf io.ReadCloser) (Response, error) {
var result plugins.DataTimeSeriesSlice var response Response
dec := json.NewDecoder(buf)
dec.UseNumber()
err := dec.Decode(&response)
return response, err
}
func transformRows(rows []Row, query *Query) data.Frames {
frames := data.Frames{}
for _, row := range rows { for _, row := range rows {
for columnIndex, column := range row.Columns { for columnIndex, column := range row.Columns {
if column == "time" { if column == "time" {
continue continue
} }
var points plugins.DataTimeSeriesPoints var timeArray []time.Time
var valueArray []*float64
for _, valuePair := range row.Values { for _, valuePair := range row.Values {
point, err := rp.parseTimepoint(valuePair, columnIndex) timestamp, timestampErr := parseTimestamp(valuePair[0])
if err == nil { // we only add this row if the timestamp is valid
points = append(points, point) if timestampErr == nil {
value := parseValue(valuePair[columnIndex])
timeArray = append(timeArray, timestamp)
valueArray = append(valueArray, value)
} }
} }
result = append(result, plugins.DataTimeSeries{ name := formatFrameName(row, column, query)
Name: rp.formatSeriesName(row, column, query),
Points: points, timeField := data.NewField("time", nil, timeArray)
Tags: row.Tags, valueField := data.NewField("value", row.Tags, valueArray)
})
// set a nice name on the value-field
valueField.SetConfig(&data.FieldConfig{DisplayNameFromDS: name})
frames = append(frames, data.NewFrame(name, timeField, valueField))
} }
} }
return result return frames
} }
func (rp *ResponseParser) formatSeriesName(row Row, column string, query *Query) string { func formatFrameName(row Row, column string, query *Query) string {
if query.Alias == "" { if query.Alias == "" {
return rp.buildSeriesNameFromQuery(row, column) return buildFrameNameFromQuery(row, column)
} }
nameSegment := strings.Split(row.Name, ".") nameSegment := strings.Split(row.Name, ".")
@ -100,7 +133,7 @@ func (rp *ResponseParser) formatSeriesName(row Row, column string, query *Query)
return string(result) return string(result)
} }
func (rp *ResponseParser) buildSeriesNameFromQuery(row Row, column string) string { func buildFrameNameFromQuery(row Row, column string) string {
var tags []string var tags []string
for k, v := range row.Tags { for k, v := range row.Tags {
tags = append(tags, fmt.Sprintf("%s: %s", k, v)) tags = append(tags, fmt.Sprintf("%s: %s", k, v))
@ -114,36 +147,54 @@ func (rp *ResponseParser) buildSeriesNameFromQuery(row Row, column string) strin
return fmt.Sprintf("%s.%s%s", row.Name, column, tagText) return fmt.Sprintf("%s.%s%s", row.Name, column, tagText)
} }
func (rp *ResponseParser) parseTimepoint(valuePair []interface{}, valuePosition int) (plugins.DataTimePoint, error) { func parseTimestamp(value interface{}) (time.Time, error) {
value := rp.parseValue(valuePair[valuePosition]) timestampNumber, ok := value.(json.Number)
timestampNumber, ok := valuePair[0].(json.Number)
if !ok { if !ok {
return plugins.DataTimePoint{}, fmt.Errorf("valuePair[0] has invalid type: %#v", valuePair[0]) return time.Time{}, fmt.Errorf("timestamp-value has invalid type: %#v", value)
} }
timestamp, err := timestampNumber.Float64() timestampFloat, err := timestampNumber.Float64()
if err != nil { if err != nil {
return plugins.DataTimePoint{}, err return time.Time{}, err
} }
return plugins.DataTimePoint{value, null.FloatFrom(timestamp * 1000)}, nil // currently in the code the influxdb-timestamps are requested with
// seconds-precision, meaning these values are seconds
t := time.Unix(int64(timestampFloat), 0).UTC()
return t, nil
} }
func (rp *ResponseParser) parseValue(value interface{}) null.Float { func parseValue(value interface{}) *float64 {
// NOTE: we use pointers-to-float64 because we need
// to represent null-json-values. they come for example
// when we do a group-by with fill(null)
// FIXME: the value of an influxdb-query can be:
// - string
// - float
// - integer
// - boolean
//
// here we only handle numeric values. this is probably
// enough for alerting, but later if we want to support
// arbitrary queries, we will have to improve the code
if value == nil {
// this is what json-nulls become
return nil
}
number, ok := value.(json.Number) number, ok := value.(json.Number)
if !ok { if !ok {
return null.FloatFromPtr(nil) // in the current inmplementation, errors become nils
return nil
} }
fvalue, err := number.Float64() fvalue, err := number.Float64()
if err == nil { if err != nil {
return null.FloatFrom(fvalue) // in the current inmplementation, errors become nils
return nil
} }
ivalue, err := number.Int64() return &fvalue
if err == nil {
return null.FloatFrom(float64(ivalue))
}
return null.FloatFromPtr(nil)
} }

View File

@ -2,200 +2,393 @@ package influxdb
import ( import (
"encoding/json" "encoding/json"
"fmt" "io"
"io/ioutil"
"strings"
"testing" "testing"
"time"
"github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/plugins"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func prepare(text string) io.ReadCloser {
return ioutil.NopCloser(strings.NewReader(text))
}
func decodedFrames(t *testing.T, result plugins.DataQueryResult) data.Frames {
decoded, err := result.Dataframes.Decoded()
require.NoError(t, err)
return decoded
}
func assertSeriesName(t *testing.T, result plugins.DataQueryResult, index int, name string) {
decoded := decodedFrames(t, result)
frame := decoded[index]
require.Equal(t, frame.Name, name)
// the current version of the alerting-code does not use the dataframe-name
// when generating the metric-names for the alerts.
// instead, it goes through multiple attributes on the Field.
// we use the `field.Config.DisplayNameFromDS` attribute.
valueFieldConfig := frame.Fields[1].Config
require.NotNil(t, valueFieldConfig)
require.Equal(t, valueFieldConfig.DisplayNameFromDS, name)
}
func TestInfluxdbResponseParser(t *testing.T) { func TestInfluxdbResponseParser(t *testing.T) {
t.Run("Influxdb response parser should parse everything normally", func(t *testing.T) { t.Run("Influxdb response parser should handle invalid JSON", func(t *testing.T) {
parser := &ResponseParser{} parser := &ResponseParser{}
cfg := setting.NewCfg() response := `{ invalid }`
err := cfg.Load(&setting.CommandLineArgs{
HomePath: "../../../",
})
require.NoError(t, err)
response := &Response{
Results: []Result{
{
Series: []Row{
{
Name: "cpu",
Columns: []string{"time", "mean", "sum"},
Tags: map[string]string{"datacenter": "America"},
Values: [][]interface{}{
{json.Number("111"), json.Number("222"), json.Number("333")},
{json.Number("111"), json.Number("222"), json.Number("333")},
{json.Number("111"), json.Number("null"), json.Number("333")},
},
},
},
},
},
}
query := &Query{} query := &Query{}
result := parser.Parse(response, query) result := parser.Parse(prepare(response), query)
require.Len(t, result.Series, 2) require.Nil(t, result.Dataframes)
require.Error(t, result.Error)
})
require.Len(t, result.Series[0].Points, 3) t.Run("Influxdb response parser should parse everything normally", func(t *testing.T) {
require.Len(t, result.Series[1].Points, 3) parser := &ResponseParser{}
require.Equal(t, result.Series[0].Points[1][0].Float64, float64(222)) response := `
require.Equal(t, result.Series[1].Points[1][0].Float64, float64(333)) {
"results": [
{
"series": [
{
"name": "cpu",
"columns": ["time","mean","sum"],
"tags": {"datacenter": "America"},
"values": [
[111,222,333],
[111,222,333],
[111,null,333]
]
}
]
}
]
}
`
require.False(t, result.Series[0].Points[2][0].Valid) query := &Query{}
require.Equal(t, result.Series[0].Name, "cpu.mean { datacenter: America }") result := parser.Parse(prepare(response), query)
require.Equal(t, result.Series[1].Name, "cpu.sum { datacenter: America }")
decoded := decodedFrames(t, result)
require.Len(t, decoded, 2)
frame1 := decoded[0]
frame2 := decoded[1]
assertSeriesName(t, result, 0, "cpu.mean { datacenter: America }")
assertSeriesName(t, result, 1, "cpu.sum { datacenter: America }")
require.Len(t, frame1.Fields, 2)
require.Len(t, frame2.Fields, 2)
require.Equal(t, frame1.Fields[0].Len(), 3)
require.Equal(t, frame1.Fields[1].Len(), 3)
require.Equal(t, frame2.Fields[0].Len(), 3)
require.Equal(t, frame2.Fields[1].Len(), 3)
require.Equal(t, *frame1.Fields[1].At(1).(*float64), 222.0)
require.Equal(t, *frame2.Fields[1].At(1).(*float64), 333.0)
require.Nil(t, frame1.Fields[1].At(2))
})
t.Run("Influxdb response parser with invalid value-format", func(t *testing.T) {
parser := &ResponseParser{}
response := `
{
"results": [
{
"series": [
{
"name": "cpu",
"columns": ["time","mean"],
"values": [
[100,50],
[101,"hello"],
[102,52]
]
}
]
}
]
}
`
query := &Query{}
result := parser.Parse(prepare(response), query)
// the current behavior is that we do not report an error, we turn the invalid value into `nil`
require.Nil(t, result.Error)
require.Equal(t, result.ErrorString, "")
decoded := decodedFrames(t, result)
require.Len(t, decoded, 1)
frame := decoded[0]
require.Len(t, frame.Fields, 2)
field1 := frame.Fields[0]
field2 := frame.Fields[1]
require.Equal(t, field1.Len(), 3)
require.Equal(t, field2.Len(), 3)
require.Equal(t, *field2.At(0).(*float64), 50.0)
require.Nil(t, field2.At(1))
require.Equal(t, *field2.At(2).(*float64), 52.0)
})
t.Run("Influxdb response parser with invalid timestamp-format", func(t *testing.T) {
parser := &ResponseParser{}
response := `
{
"results": [
{
"series": [
{
"name": "cpu",
"columns": ["time","mean"],
"values": [
[100,50],
["hello",51],
["hello","hello"],
[102,52]
]
}
]
}
]
}
`
query := &Query{}
result := parser.Parse(prepare(response), query)
// the current behavior is that we do not report an error, we skip the item with the invalid timestamp
require.Nil(t, result.Error)
require.Equal(t, result.ErrorString, "")
decoded := decodedFrames(t, result)
require.Len(t, decoded, 1)
frame := decoded[0]
require.Len(t, frame.Fields, 2)
field1 := frame.Fields[0]
field2 := frame.Fields[1]
require.Equal(t, field1.Len(), 2)
require.Equal(t, field2.Len(), 2)
require.Equal(t, *field2.At(0).(*float64), 50.0)
require.Equal(t, *field2.At(1).(*float64), 52.0)
}) })
t.Run("Influxdb response parser with alias", func(t *testing.T) { t.Run("Influxdb response parser with alias", func(t *testing.T) {
parser := &ResponseParser{} parser := &ResponseParser{}
response := &Response{ response := `
Results: []Result{ {
"results": [
{ {
Series: []Row{ "series": [
{ {
Name: "cpu.upc", "name": "cpu.upc",
Columns: []string{"time", "mean", "sum"}, "columns": ["time","mean","sum"],
Tags: map[string]string{ "tags": {
"datacenter": "America", "datacenter": "America",
"dc.region.name": "Northeast", "dc.region.name": "Northeast",
"cluster-name": "Cluster", "cluster-name": "Cluster",
"/cluster/name/": "Cluster/", "/cluster/name/": "Cluster/",
"@cluster@name@": "Cluster@", "@cluster@name@": "Cluster@"
}, },
Values: [][]interface{}{ "values": [
{json.Number("111"), json.Number("222"), json.Number("333")}, [111,222,333]
}, ]
}, }
}, ]
}, }
}, ]
} }
`
query := &Query{Alias: "series alias"} query := &Query{Alias: "series alias"}
result := parser.Parse(response, query) result := parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "series alias") assertSeriesName(t, result, 0, "series alias")
query = &Query{Alias: "alias $m $measurement", Measurement: "10m"} query = &Query{Alias: "alias $m $measurement", Measurement: "10m"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias 10m 10m") assertSeriesName(t, result, 0, "alias 10m 10m")
query = &Query{Alias: "alias $col", Measurement: "10m"} query = &Query{Alias: "alias $col", Measurement: "10m"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias mean") assertSeriesName(t, result, 0, "alias mean")
require.Equal(t, result.Series[1].Name, "alias sum") assertSeriesName(t, result, 1, "alias sum")
query = &Query{Alias: "alias $tag_datacenter"} query = &Query{Alias: "alias $tag_datacenter"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias America") assertSeriesName(t, result, 0, "alias America")
query = &Query{Alias: "alias $1"} query = &Query{Alias: "alias $1"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias upc") assertSeriesName(t, result, 0, "alias upc")
query = &Query{Alias: "alias $5"} query = &Query{Alias: "alias $5"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias $5") assertSeriesName(t, result, 0, "alias $5")
query = &Query{Alias: "series alias"} query = &Query{Alias: "series alias"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "series alias") assertSeriesName(t, result, 0, "series alias")
query = &Query{Alias: "alias [[m]] [[measurement]]", Measurement: "10m"} query = &Query{Alias: "alias [[m]] [[measurement]]", Measurement: "10m"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias 10m 10m") assertSeriesName(t, result, 0, "alias 10m 10m")
query = &Query{Alias: "alias [[col]]", Measurement: "10m"} query = &Query{Alias: "alias [[col]]", Measurement: "10m"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias mean") assertSeriesName(t, result, 0, "alias mean")
require.Equal(t, result.Series[1].Name, "alias sum") assertSeriesName(t, result, 1, "alias sum")
query = &Query{Alias: "alias [[tag_datacenter]]"} query = &Query{Alias: "alias [[tag_datacenter]]"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias America") assertSeriesName(t, result, 0, "alias America")
query = &Query{Alias: "alias [[tag_dc.region.name]]"} query = &Query{Alias: "alias [[tag_dc.region.name]]"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias Northeast") assertSeriesName(t, result, 0, "alias Northeast")
query = &Query{Alias: "alias [[tag_cluster-name]]"} query = &Query{Alias: "alias [[tag_cluster-name]]"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias Cluster") assertSeriesName(t, result, 0, "alias Cluster")
query = &Query{Alias: "alias [[tag_/cluster/name/]]"} query = &Query{Alias: "alias [[tag_/cluster/name/]]"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias Cluster/") assertSeriesName(t, result, 0, "alias Cluster/")
query = &Query{Alias: "alias [[tag_@cluster@name@]]"} query = &Query{Alias: "alias [[tag_@cluster@name@]]"}
result = parser.Parse(response, query) result = parser.Parse(prepare(response), query)
require.Equal(t, result.Series[0].Name, "alias Cluster@") assertSeriesName(t, result, 0, "alias Cluster@")
}) })
t.Run("Influxdb response parser with errors", func(t *testing.T) { t.Run("Influxdb response parser with errors", func(t *testing.T) {
parser := &ResponseParser{} parser := &ResponseParser{}
cfg := setting.NewCfg() response := `
err := cfg.Load(&setting.CommandLineArgs{ {
HomePath: "../../../", "results": [
})
require.Nil(t, err)
response := &Response{
Results: []Result{
{ {
Series: []Row{ "series": [
{ {
Name: "cpu", "name": "cpu",
Columns: []string{"time", "mean", "sum"}, "columns": ["time","mean","sum"],
Tags: map[string]string{"datacenter": "America"}, "tags": {"datacenter": "America"},
Values: [][]interface{}{ "values": [
{json.Number("111"), json.Number("222"), json.Number("333")}, [111,222,333],
{json.Number("111"), json.Number("222"), json.Number("333")}, [111,222,333],
{json.Number("111"), json.Number("null"), json.Number("333")}, [111,null,333]
}, ]
}, }
}, ]
}, },
{ {
Err: fmt.Errorf("query-timeout limit exceeded"), "error": "query-timeout limit exceeded"
}, }
}, ]
} }
`
query := &Query{} query := &Query{}
result := parser.Parse(response, query) result := parser.Parse(prepare(response), query)
require.Len(t, result.Series, 2) decoded := decodedFrames(t, result)
require.Len(t, result.Series[0].Points, 3) require.Len(t, decoded, 2)
require.Len(t, result.Series[1].Points, 3)
require.Error(t, result.Error) require.Equal(t, decoded[0].Fields[0].Len(), 3)
require.Equal(t, result.Error.Error(), "query-timeout limit exceeded") require.Equal(t, decoded[0].Fields[1].Len(), 3)
require.Equal(t, decoded[1].Fields[0].Len(), 3)
require.Equal(t, decoded[1].Fields[1].Len(), 3)
require.EqualError(t, result.Error, "query-timeout limit exceeded")
})
t.Run("Influxdb response parser with top-level error", func(t *testing.T) {
parser := &ResponseParser{}
response := `
{
"error": "error parsing query: found THING"
}
`
query := &Query{}
result := parser.Parse(prepare(response), query)
require.Nil(t, result.Dataframes)
require.EqualError(t, result.Error, "error parsing query: found THING")
})
t.Run("Influxdb response parser parseValue nil", func(t *testing.T) {
value := parseValue(nil)
require.Nil(t, value)
})
t.Run("Influxdb response parser parseValue valid JSON.number", func(t *testing.T) {
value := parseValue(json.Number("95.4"))
require.Equal(t, *value, 95.4)
})
t.Run("Influxdb response parser parseValue invalid type", func(t *testing.T) {
value := parseValue("95.4")
require.Nil(t, value)
})
t.Run("Influxdb response parser parseTimestamp valid JSON.number", func(t *testing.T) {
// currently we use seconds-precision with influxdb, so the test works with that.
// if we change this to for example milliseconds-precision, the tests will have to change.
timestamp, err := parseTimestamp(json.Number("1609556645"))
require.NoError(t, err)
require.Equal(t, timestamp.Format(time.RFC3339), "2021-01-02T03:04:05Z")
})
t.Run("Influxdb response parser parseValue invalid type", func(t *testing.T) {
_, err := parseTimestamp("hello")
require.Error(t, err)
}) })
} }