mirror of
https://github.com/grafana/grafana.git
synced 2025-02-25 18:55:37 -06:00
Graphite: Process multiple queries to Graphite plugin (#59608)
* make create call consistent with update and delete
* send multiple targets to graphite and correlate the responses with the requests
* make create call consistent with update and delete
* send multiple targets to graphite and correlate the responses with the requests
* Revert "make create call consistent with update and delete"
This reverts commit 26b6463bd6
.
* refactor query -> target parsing and fix unit tests
* add additional validations and more unit tests
* change error statement to warn
This commit is contained in:
parent
e1e858323a
commit
0c560b8b0d
@ -111,39 +111,24 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
||||
"until": []string{until},
|
||||
"format": []string{"json"},
|
||||
"maxDataPoints": []string{"500"},
|
||||
"target": []string{},
|
||||
}
|
||||
|
||||
// Calculate and get the last target of Graphite Request
|
||||
var target string
|
||||
emptyQueries := make([]string, 0)
|
||||
for _, query := range req.Queries {
|
||||
model, err := simplejson.NewJson(query.JSON)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
logger.Debug("graphite", "query", model)
|
||||
currTarget := ""
|
||||
if fullTarget, err := model.Get(TargetFullModelField).String(); err == nil {
|
||||
currTarget = fullTarget
|
||||
} else {
|
||||
currTarget = model.Get(TargetModelField).MustString()
|
||||
}
|
||||
if currTarget == "" {
|
||||
logger.Debug("graphite", "empty query target", model)
|
||||
emptyQueries = append(emptyQueries, fmt.Sprintf("Query: %v has no target", model))
|
||||
continue
|
||||
}
|
||||
target = fixIntervalFormat(currTarget)
|
||||
// Convert datasource query to graphite target request
|
||||
targetList, emptyQueries, origRefIds, err := s.processQueries(logger, req.Queries)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var result = backend.QueryDataResponse{}
|
||||
|
||||
if target == "" {
|
||||
logger.Error("No targets in query model", "models without targets", strings.Join(emptyQueries, "\n"))
|
||||
return &result, errors.New("no query target found for the alert rule")
|
||||
if len(emptyQueries) != 0 {
|
||||
logger.Warn("Found query models without targets", "models without targets", strings.Join(emptyQueries, "\n"))
|
||||
// If no queries had a valid target, return an error; otherwise, attempt with the targets we have
|
||||
if len(emptyQueries) == len(req.Queries) {
|
||||
return &result, errors.New("no query target found for the alert rule")
|
||||
}
|
||||
}
|
||||
|
||||
formData["target"] = []string{target}
|
||||
formData["target"] = targetList
|
||||
|
||||
if setting.Env == setting.Dev {
|
||||
logger.Debug("Graphite request", "params", formData)
|
||||
@ -157,7 +142,8 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
||||
ctx, span := s.tracer.Start(ctx, "graphite query")
|
||||
defer span.End()
|
||||
|
||||
span.SetAttributes("target", target, attribute.Key("target").String(target))
|
||||
targetStr := strings.Join(formData["target"], ",")
|
||||
span.SetAttributes("target", targetStr, attribute.Key("target").String(targetStr))
|
||||
span.SetAttributes("from", from, attribute.Key("from").String(from))
|
||||
span.SetAttributes("until", until, attribute.Key("until").String(until))
|
||||
span.SetAttributes("datasource_id", dsInfo.Id, attribute.Key("datasource_id").Int64(dsInfo.Id))
|
||||
@ -175,7 +161,7 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
||||
return &result, err
|
||||
}
|
||||
|
||||
frames, err := s.toDataFrames(logger, res)
|
||||
frames, err := s.toDataFrames(logger, res, origRefIds)
|
||||
if err != nil {
|
||||
span.RecordError(err)
|
||||
span.SetStatus(codes.Error, err.Error())
|
||||
@ -186,13 +172,57 @@ func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest)
|
||||
Responses: make(backend.Responses),
|
||||
}
|
||||
|
||||
result.Responses["A"] = backend.DataResponse{
|
||||
Frames: frames,
|
||||
for _, f := range frames {
|
||||
result.Responses[f.Name] = backend.DataResponse{
|
||||
Frames: data.Frames{f},
|
||||
}
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// processQueries converts each datasource query to a graphite query target. It returns the list of
|
||||
// targets, a list of invalid queries, and a mapping of formatted refIds (used in the target query)
|
||||
// to original query refIds, later used to associate ressponses with the original queries
|
||||
func (s *Service) processQueries(logger log.Logger, queries []backend.DataQuery) ([]string, []string, map[string]string, error) {
|
||||
emptyQueries := make([]string, 0)
|
||||
origRefIds := make(map[string]string, 0)
|
||||
targets := make([]string, 0)
|
||||
|
||||
for _, query := range queries {
|
||||
model, err := simplejson.NewJson(query.JSON)
|
||||
if err != nil {
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
logger.Debug("graphite", "query", model)
|
||||
currTarget := ""
|
||||
if fullTarget, err := model.Get(TargetFullModelField).String(); err == nil {
|
||||
currTarget = fullTarget
|
||||
} else {
|
||||
currTarget = model.Get(TargetModelField).MustString()
|
||||
}
|
||||
if currTarget == "" {
|
||||
logger.Debug("graphite", "empty query target", model)
|
||||
emptyQueries = append(emptyQueries, fmt.Sprintf("Query: %v has no target", model))
|
||||
continue
|
||||
}
|
||||
target := fixIntervalFormat(currTarget)
|
||||
|
||||
// This is a somewhat inglorious way to ensure we can associate results with the right query
|
||||
// By using aliasSub, we can get back a resolved series Target name (accounting for other aliases)
|
||||
// And the original refId. Since there are no restrictions on refId, we need to format it to make it
|
||||
// easy to find in the response
|
||||
formattedRefId := strings.ReplaceAll(query.RefID, " ", "_")
|
||||
origRefIds[formattedRefId] = query.RefID
|
||||
// This will set the alias to `<resolvedSeriesName> <formattedRefId>`
|
||||
// e.g. aliasSub(alias(myquery, "foo"), "(^.*$)", "\1 A") will return "foo A"
|
||||
target = fmt.Sprintf("aliasSub(%s,\"(^.*$)\",\"\\1 %s\")", target, formattedRefId)
|
||||
targets = append(targets, target)
|
||||
}
|
||||
|
||||
return targets, emptyQueries, origRefIds, nil
|
||||
}
|
||||
|
||||
func (s *Service) parseResponse(logger log.Logger, res *http.Response) ([]TargetResponseDTO, error) {
|
||||
body, err := io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
@ -219,7 +249,7 @@ func (s *Service) parseResponse(logger log.Logger, res *http.Response) ([]Target
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func (s *Service) toDataFrames(logger log.Logger, response *http.Response) (frames data.Frames, error error) {
|
||||
func (s *Service) toDataFrames(logger log.Logger, response *http.Response, origRefIds map[string]string) (frames data.Frames, error error) {
|
||||
responseData, err := s.parseResponse(logger, response)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -229,7 +259,18 @@ func (s *Service) toDataFrames(logger log.Logger, response *http.Response) (fram
|
||||
for _, series := range responseData {
|
||||
timeVector := make([]time.Time, 0, len(series.DataPoints))
|
||||
values := make([]*float64, 0, len(series.DataPoints))
|
||||
name := series.Target
|
||||
// series.Target will be in the format <resolvedSeriesName> <formattedRefId>
|
||||
ls := strings.LastIndex(series.Target, " ")
|
||||
if ls == -1 {
|
||||
return nil, fmt.Errorf("received graphite response with invalid target format: %s", series.Target)
|
||||
}
|
||||
target := series.Target[:ls]
|
||||
formattedRefId := series.Target[ls+1:]
|
||||
refId, ok := origRefIds[formattedRefId]
|
||||
if !ok {
|
||||
logger.Warn("Unable to find refId associated with provided formattedRefId", "formattedRefId", formattedRefId)
|
||||
refId = formattedRefId // fallback - shouldn't happen except for in tests
|
||||
}
|
||||
|
||||
for _, dataPoint := range series.DataPoints {
|
||||
var timestamp, value, err = parseDataTimePoint(dataPoint)
|
||||
@ -250,9 +291,9 @@ func (s *Service) toDataFrames(logger log.Logger, response *http.Response) (fram
|
||||
}
|
||||
}
|
||||
|
||||
frames = append(frames, data.NewFrame(name,
|
||||
frames = append(frames, data.NewFrame(refId,
|
||||
data.NewField("time", nil, timeVector),
|
||||
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})))
|
||||
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: target})))
|
||||
|
||||
if setting.Env == setting.Dev {
|
||||
logger.Debug("Graphite response", "target", series.Target, "datapoints", len(series.DataPoints))
|
||||
|
@ -1,7 +1,9 @@
|
||||
package graphite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"reflect"
|
||||
@ -9,7 +11,10 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
|
||||
"github.com/grafana/grafana-plugin-sdk-go/data"
|
||||
"github.com/grafana/grafana/pkg/components/simplejson"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@ -48,27 +53,123 @@ func TestFixIntervalFormat(t *testing.T) {
|
||||
assert.Equal(t, tc.expected, tr)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProcessQueries(t *testing.T) {
|
||||
service := &Service{}
|
||||
log := logger.FromContext(context.Background())
|
||||
t.Run("Parses single valid query", func(t *testing.T) {
|
||||
queries := []backend.DataQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
JSON: []byte(`{
|
||||
"target": "app.grafana.*.dashboards.views.1M.count"
|
||||
}`),
|
||||
},
|
||||
}
|
||||
targets, invalids, mapping, err := service.processQueries(log, queries)
|
||||
assert.NoError(t, err)
|
||||
assert.Empty(t, invalids)
|
||||
assert.Len(t, mapping, 1)
|
||||
assert.Len(t, targets, 1)
|
||||
assert.Equal(t, "aliasSub(app.grafana.*.dashboards.views.1M.count,\"(^.*$)\",\"\\1 A\")", targets[0])
|
||||
})
|
||||
|
||||
t.Run("Parses multiple valid queries with refId mappings", func(t *testing.T) {
|
||||
queries := []backend.DataQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
JSON: []byte(`{
|
||||
"target": "app.grafana.*.dashboards.views.1M.count"
|
||||
}`),
|
||||
},
|
||||
{
|
||||
RefID: "query B",
|
||||
JSON: []byte(`{
|
||||
"target": "aliasByNode(hitcount(averageSeries(app.grafana.*.dashboards.views.count), '1mon'), 4)"
|
||||
}`),
|
||||
},
|
||||
}
|
||||
targets, invalids, mapping, err := service.processQueries(log, queries)
|
||||
assert.NoError(t, err)
|
||||
assert.Empty(t, invalids)
|
||||
assert.Len(t, mapping, 2)
|
||||
assert.Len(t, targets, 2)
|
||||
assert.Equal(t, "aliasSub(app.grafana.*.dashboards.views.1M.count,\"(^.*$)\",\"\\1 A\")", targets[0])
|
||||
assert.Equal(t, "aliasSub(aliasByNode(hitcount(averageSeries(app.grafana.*.dashboards.views.count), '1mon'), 4),\"(^.*$)\",\"\\1 query_B\")", targets[1])
|
||||
})
|
||||
|
||||
t.Run("Parses multiple queries with one invalid", func(t *testing.T) {
|
||||
queries := []backend.DataQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
JSON: []byte(`{
|
||||
"target": "app.grafana.*.dashboards.views.1M.count"
|
||||
}`),
|
||||
},
|
||||
{
|
||||
RefID: "B",
|
||||
JSON: []byte(`{
|
||||
"query": "app.grafana.*.dashboards.views.1M.count"
|
||||
}`),
|
||||
},
|
||||
}
|
||||
targets, invalids, mapping, err := service.processQueries(log, queries)
|
||||
assert.NoError(t, err)
|
||||
assert.Len(t, invalids, 1)
|
||||
assert.Len(t, mapping, 1)
|
||||
assert.Len(t, targets, 1)
|
||||
json, _ := simplejson.NewJson(queries[1].JSON)
|
||||
expectedInvalid := fmt.Sprintf("Query: %v has no target", json)
|
||||
assert.Equal(t, expectedInvalid, invalids[0])
|
||||
})
|
||||
|
||||
t.Run("QueryData with no valid queries returns an error", func(t *testing.T) {
|
||||
queries := []backend.DataQuery{
|
||||
{
|
||||
RefID: "A",
|
||||
JSON: []byte(`{
|
||||
"query": "app.grafana.*.dashboards.views.1M.count"
|
||||
}`),
|
||||
},
|
||||
{
|
||||
RefID: "B",
|
||||
JSON: []byte(`{
|
||||
"query": "app.grafana.*.dashboards.views.1M.count"
|
||||
}`),
|
||||
},
|
||||
}
|
||||
|
||||
service.im = fakeInstanceManager{}
|
||||
_, err := service.QueryData(context.Background(), &backend.QueryDataRequest{
|
||||
Queries: queries,
|
||||
})
|
||||
assert.Error(t, err)
|
||||
assert.Equal(t, err.Error(), "no query target found for the alert rule")
|
||||
})
|
||||
}
|
||||
|
||||
func TestConvertResponses(t *testing.T) {
|
||||
service := &Service{}
|
||||
|
||||
t.Run("Converts response without tags to data frames", func(*testing.T) {
|
||||
body := `
|
||||
[
|
||||
{
|
||||
"target": "target",
|
||||
"target": "target A",
|
||||
"datapoints": [[50, 1], [null, 2], [100, 3]]
|
||||
}
|
||||
]`
|
||||
a := 50.0
|
||||
b := 100.0
|
||||
expectedFrame := data.NewFrame("target",
|
||||
expectedFrame := data.NewFrame("A",
|
||||
data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}),
|
||||
data.NewField("value", data.Labels{}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "target"}),
|
||||
)
|
||||
expectedFrames := data.Frames{expectedFrame}
|
||||
|
||||
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
|
||||
dataFrames, err := service.toDataFrames(logger, httpResponse)
|
||||
dataFrames, err := service.toDataFrames(logger, httpResponse, map[string]string{})
|
||||
|
||||
require.NoError(t, err)
|
||||
if !reflect.DeepEqual(expectedFrames, dataFrames) {
|
||||
@ -82,14 +183,14 @@ func TestFixIntervalFormat(t *testing.T) {
|
||||
body := `
|
||||
[
|
||||
{
|
||||
"target": "target",
|
||||
"target": "target A",
|
||||
"tags": { "fooTag": "fooValue", "barTag": "barValue", "int": 100, "float": 3.14 },
|
||||
"datapoints": [[50, 1], [null, 2], [100, 3]]
|
||||
}
|
||||
]`
|
||||
a := 50.0
|
||||
b := 100.0
|
||||
expectedFrame := data.NewFrame("target",
|
||||
expectedFrame := data.NewFrame("A",
|
||||
data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}),
|
||||
data.NewField("value", data.Labels{
|
||||
"fooTag": "fooValue",
|
||||
@ -101,7 +202,7 @@ func TestFixIntervalFormat(t *testing.T) {
|
||||
expectedFrames := data.Frames{expectedFrame}
|
||||
|
||||
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
|
||||
dataFrames, err := service.toDataFrames(logger, httpResponse)
|
||||
dataFrames, err := service.toDataFrames(logger, httpResponse, map[string]string{})
|
||||
|
||||
require.NoError(t, err)
|
||||
if !reflect.DeepEqual(expectedFrames, dataFrames) {
|
||||
@ -110,4 +211,89 @@ func TestFixIntervalFormat(t *testing.T) {
|
||||
t.Errorf("Data frames should have been equal but was, expected:\n%s\nactual:\n%s", expectedFramesJSON, dataFramesJSON)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Converts response with multiple targets", func(*testing.T) {
|
||||
body := `
|
||||
[
|
||||
{
|
||||
"target": "target 1 A",
|
||||
"datapoints": [[50, 1], [null, 2], [100, 3]]
|
||||
},
|
||||
{
|
||||
"target": "target 2 B",
|
||||
"datapoints": [[50, 1], [null, 2], [100, 3]]
|
||||
}
|
||||
]`
|
||||
a := 50.0
|
||||
b := 100.0
|
||||
expectedFrameA := data.NewFrame("A",
|
||||
data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}),
|
||||
data.NewField("value", data.Labels{}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "target 1"}),
|
||||
)
|
||||
expectedFrameB := data.NewFrame("B",
|
||||
data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}),
|
||||
data.NewField("value", data.Labels{}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "target 2"}),
|
||||
)
|
||||
expectedFrames := data.Frames{expectedFrameA, expectedFrameB}
|
||||
|
||||
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
|
||||
dataFrames, err := service.toDataFrames(logger, httpResponse, map[string]string{})
|
||||
|
||||
require.NoError(t, err)
|
||||
if !reflect.DeepEqual(expectedFrames, dataFrames) {
|
||||
expectedFramesJSON, _ := json.Marshal(expectedFrames)
|
||||
dataFramesJSON, _ := json.Marshal(dataFrames)
|
||||
t.Errorf("Data frames should have been equal but was, expected:\n%s\nactual:\n%s", expectedFramesJSON, dataFramesJSON)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Converts response with refId mapping", func(*testing.T) {
|
||||
body := `
|
||||
[
|
||||
{
|
||||
"target": "target A_A",
|
||||
"datapoints": [[50, 1], [null, 2], [100, 3]]
|
||||
}
|
||||
]`
|
||||
a := 50.0
|
||||
b := 100.0
|
||||
expectedFrame := data.NewFrame("A A",
|
||||
data.NewField("time", nil, []time.Time{time.Unix(1, 0).UTC(), time.Unix(2, 0).UTC(), time.Unix(3, 0).UTC()}),
|
||||
data.NewField("value", data.Labels{}, []*float64{&a, nil, &b}).SetConfig(&data.FieldConfig{DisplayNameFromDS: "target"}),
|
||||
)
|
||||
expectedFrames := data.Frames{expectedFrame}
|
||||
|
||||
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
|
||||
dataFrames, err := service.toDataFrames(logger, httpResponse, map[string]string{"A_A": "A A"})
|
||||
|
||||
require.NoError(t, err)
|
||||
if !reflect.DeepEqual(expectedFrames, dataFrames) {
|
||||
expectedFramesJSON, _ := json.Marshal(expectedFrames)
|
||||
dataFramesJSON, _ := json.Marshal(dataFrames)
|
||||
t.Errorf("Data frames should have been equal but was, expected:\n%s\nactual:\n%s", expectedFramesJSON, dataFramesJSON)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Chokes on response with invalid target name", func(*testing.T) {
|
||||
body := `
|
||||
[
|
||||
{
|
||||
"target": "target",
|
||||
"datapoints": [[50, 1], [null, 2], [100, 3]]
|
||||
}
|
||||
]`
|
||||
httpResponse := &http.Response{StatusCode: 200, Body: io.NopCloser(strings.NewReader(body))}
|
||||
_, err := service.toDataFrames(logger, httpResponse, map[string]string{})
|
||||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
type fakeInstanceManager struct{}
|
||||
|
||||
func (f fakeInstanceManager) Get(pluginContext backend.PluginContext) (instancemgmt.Instance, error) {
|
||||
return datasourceInfo{}, nil
|
||||
}
|
||||
|
||||
func (f fakeInstanceManager) Do(pluginContext backend.PluginContext, fn instancemgmt.InstanceCallbackFunc) error {
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user