Prometheus: Migrate to use SDK contracts (#37358)

* Use SDK contracts (test needs fixing)

* Fix tests

* Add customQueryParametersMiddleware

* Fix merge conflicts
This commit is contained in:
Dimitris Sotirakis 2021-08-09 12:11:19 +03:00 committed by GitHub
parent 2d33ddf37f
commit 27c71a1f09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 1097 additions and 1000 deletions

File diff suppressed because it is too large Load Diff

View File

@ -2,18 +2,25 @@ package prometheus
import (
"context"
"encoding/json"
"errors"
"fmt"
"regexp"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/tsdb/interval"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/api"
apiv1 "github.com/prometheus/client_golang/api/prometheus/v1"
@ -21,55 +28,104 @@ import (
)
var (
plog log.Logger
legendFormat *regexp.Regexp = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
safeRes int64 = 11000
plog = log.New("tsdb.prometheus")
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
safeRes = 11000
)
type DatasourceInfo struct {
ID int64
HTTPClientOpts sdkhttpclient.Options
URL string
HTTPMethod string
}
func init() {
plog = log.New("tsdb.prometheus")
registry.Register(&registry.Descriptor{
Name: "PrometheusService",
InitPriority: registry.Low,
Instance: &Service{},
})
}
type PrometheusExecutor struct {
client apiv1.API
intervalCalculator interval.Calculator
type Service struct {
BackendPluginManager backendplugin.Manager `inject:""`
HTTPClientProvider httpclient.Provider `inject:""`
intervalCalculator tsdb.Calculator
im instancemgmt.InstanceManager
}
//nolint: staticcheck // plugins.DataPlugin deprecated
func New(provider httpclient.Provider) func(*models.DataSource) (plugins.DataPlugin, error) {
return func(dsInfo *models.DataSource) (plugins.DataPlugin, error) {
transport, err := dsInfo.GetHTTPTransport(provider, customQueryParametersMiddleware(plog))
func (s *Service) Init() error {
plog.Debug("initializing")
im := datasource.NewInstanceManager(newInstanceSettings())
factory := coreplugin.New(backend.ServeOpts{
QueryDataHandler: newService(im, s.HTTPClientProvider),
})
if err := s.BackendPluginManager.Register("prometheus", factory); err != nil {
plog.Error("Failed to register plugin", "error", err)
}
return nil
}
func newInstanceSettings() datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
jsonData := map[string]interface{}{}
err := json.Unmarshal(settings.JSONData, &jsonData)
if err != nil {
return nil, err
return nil, fmt.Errorf("error reading settings: %w", err)
}
cfg := api.Config{
Address: dsInfo.Url,
RoundTripper: transport,
}
client, err := api.NewClient(cfg)
httpCliOpts, err := settings.HTTPClientOptions()
if err != nil {
return nil, err
return nil, fmt.Errorf("error getting http options: %w", err)
}
return &PrometheusExecutor{
intervalCalculator: interval.NewCalculator(interval.CalculatorOptions{MinInterval: time.Second * 1}),
client: apiv1.NewAPI(client),
}, nil
httpMethod, ok := jsonData["httpMethod"].(string)
if !ok {
return nil, errors.New("no http method provided")
}
mdl := DatasourceInfo{
ID: settings.ID,
URL: settings.URL,
HTTPClientOpts: httpCliOpts,
HTTPMethod: httpMethod,
}
return mdl, nil
}
}
// newService creates a new executor func.
func newService(im instancemgmt.InstanceManager, httpClientProvider httpclient.Provider) *Service {
return &Service{
im: im,
HTTPClientProvider: httpClientProvider,
intervalCalculator: tsdb.NewCalculator(),
}
}
//nolint: staticcheck // plugins.DataResponse deprecated
func (e *PrometheusExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource,
tsdbQuery plugins.DataQuery) (plugins.DataResponse, error) {
result := plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{},
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
if len(req.Queries) == 0 {
return &backend.QueryDataResponse{}, fmt.Errorf("query contains no queries")
}
queries, err := e.parseQuery(dsInfo, tsdbQuery)
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return result, err
return nil, err
}
client, err := getClient(dsInfo, s)
if err != nil {
return nil, err
}
result := backend.QueryDataResponse{
Responses: backend.Responses{},
}
queries, err := s.parseQuery(req.Queries)
if err != nil {
return &result, err
}
for _, query := range queries {
@ -87,20 +143,60 @@ func (e *PrometheusExecutor) DataQuery(ctx context.Context, dsInfo *models.DataS
span.SetTag("stop_unixnano", query.End.UnixNano())
defer span.Finish()
value, _, err := e.client.QueryRange(ctx, query.Expr, timeRange)
value, _, err := client.QueryRange(ctx, query.Expr, timeRange)
if err != nil {
return result, err
return &result, err
}
queryResult, err := parseResponse(value, query)
frame, err := parseResponse(value, query)
if err != nil {
return result, err
return &result, err
}
result.Responses[query.RefId] = backend.DataResponse{
Frames: frame,
}
result.Results[query.RefId] = queryResult
}
return result, nil
return &result, nil
}
func getClient(dsInfo *DatasourceInfo, s *Service) (apiv1.API, error) {
opts := &sdkhttpclient.Options{
Timeouts: dsInfo.HTTPClientOpts.Timeouts,
TLS: dsInfo.HTTPClientOpts.TLS,
}
customMiddlewares := customQueryParametersMiddleware(plog)
opts.Middlewares = []sdkhttpclient.Middleware{customMiddlewares}
roundTripper, err := s.HTTPClientProvider.GetTransport(*opts)
if err != nil {
return nil, err
}
cfg := api.Config{
Address: dsInfo.URL,
RoundTripper: roundTripper,
}
client, err := api.NewClient(cfg)
if err != nil {
return nil, err
}
return apiv1.NewAPI(client), nil
}
func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*DatasourceInfo, error) {
i, err := s.im.Get(pluginCtx)
if err != nil {
return nil, err
}
instance := i.(DatasourceInfo)
return &instance, nil
}
func formatLegend(metric model.Metric, query *PrometheusQuery) string {
@ -121,49 +217,47 @@ func formatLegend(metric model.Metric, query *PrometheusQuery) string {
return string(result)
}
func (e *PrometheusExecutor) parseQuery(dsInfo *models.DataSource, query plugins.DataQuery) (
func (s *Service) parseQuery(queries []backend.DataQuery) (
[]*PrometheusQuery, error) {
var intervalMode string
var adjustedInterval time.Duration
qs := []*PrometheusQuery{}
for _, queryModel := range query.Queries {
expr, err := queryModel.Model.Get("expr").String()
for _, queryModel := range queries {
jsonModel, err := simplejson.NewJson(queryModel.JSON)
if err != nil {
return nil, err
}
expr, err := jsonModel.Get("expr").String()
if err != nil {
return nil, err
}
format := queryModel.Model.Get("legendFormat").MustString("")
format := jsonModel.Get("legendFormat").MustString("")
start, err := query.TimeRange.ParseFrom()
if err != nil {
return nil, err
}
start := queryModel.TimeRange.From
end := queryModel.TimeRange.To
queryInterval := jsonModel.Get("interval").MustString("")
end, err := query.TimeRange.ParseTo()
if err != nil {
return nil, err
}
hasQueryInterval := queryModel.Model.Get("interval").MustString("") != ""
dsInterval, err := tsdb.GetIntervalFrom(queryInterval, "", 0, 15*time.Second)
hasQueryInterval := queryInterval != ""
// Only use stepMode if we have interval in query, otherwise use "min"
if hasQueryInterval {
intervalMode = queryModel.Model.Get("stepMode").MustString("min")
intervalMode = jsonModel.Get("stepMode").MustString("min")
} else {
intervalMode = "min"
}
// Calculate interval value from query or data source settings or use default value
intervalValue, err := interval.GetIntervalFrom(dsInfo, queryModel.Model, time.Second*15)
if err != nil {
return nil, err
}
calculatedInterval, err := e.intervalCalculator.Calculate(*query.TimeRange, intervalValue, intervalMode)
calculatedInterval, err := s.intervalCalculator.Calculate(queries[0].TimeRange, dsInterval, tsdb.IntervalMode(intervalMode))
if err != nil {
return nil, err
}
safeInterval := e.intervalCalculator.CalculateSafeInterval(*query.TimeRange, safeRes)
safeInterval := s.intervalCalculator.CalculateSafeInterval(queries[0].TimeRange, int64(safeRes))
if calculatedInterval.Value > safeInterval.Value {
adjustedInterval = calculatedInterval.Value
@ -171,7 +265,7 @@ func (e *PrometheusExecutor) parseQuery(dsInfo *models.DataSource, query plugins
adjustedInterval = safeInterval.Value
}
intervalFactor := queryModel.Model.Get("intervalFactor").MustInt64(1)
intervalFactor := jsonModel.Get("intervalFactor").MustInt64(1)
step := time.Duration(int64(adjustedInterval) * intervalFactor)
qs = append(qs, &PrometheusQuery{
@ -187,14 +281,12 @@ func (e *PrometheusExecutor) parseQuery(dsInfo *models.DataSource, query plugins
return qs, nil
}
//nolint: staticcheck // plugins.DataQueryResult deprecated
func parseResponse(value model.Value, query *PrometheusQuery) (plugins.DataQueryResult, error) {
var queryRes plugins.DataQueryResult
func parseResponse(value model.Value, query *PrometheusQuery) (data.Frames, error) {
frames := data.Frames{}
matrix, ok := value.(model.Matrix)
if !ok {
return queryRes, fmt.Errorf("unsupported result format: %q", value.Type().String())
return frames, fmt.Errorf("unsupported result format: %q", value.Type().String())
}
for _, v := range matrix {
@ -215,9 +307,8 @@ func parseResponse(value model.Value, query *PrometheusQuery) (plugins.DataQuery
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})))
}
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
return queryRes, nil
return frames, nil
}
// IsAPIError returns whether err is or wraps a Prometheus error.

View File

@ -1,40 +1,22 @@
package prometheus
import (
"context"
"net/http"
"testing"
"time"
sdkhttpclient "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/tsdb"
p "github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
)
var now = time.Now()
func TestPrometheus(t *testing.T) {
json, _ := simplejson.NewJson([]byte(`
{ "customQueryParameters": "custom=par/am&second=f oo"}
`))
dsInfo := &models.DataSource{
JsonData: json,
service := Service{
intervalCalculator: tsdb.NewCalculator(),
}
var capturedRequest *http.Request
mw := sdkhttpclient.MiddlewareFunc(func(opts sdkhttpclient.Options, next http.RoundTripper) http.RoundTripper {
return sdkhttpclient.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
capturedRequest = req
return &http.Response{StatusCode: http.StatusOK}, nil
})
})
provider := httpclient.NewProvider(sdkhttpclient.ProviderOptions{
Middlewares: []sdkhttpclient.Middleware{mw},
})
plug, err := New(provider)(dsInfo)
require.NoError(t, err)
executor := plug.(*PrometheusExecutor)
t.Run("converting metric name", func(t *testing.T) {
metric := map[p.LabelName]p.LabelValue{
@ -69,9 +51,12 @@ func TestPrometheus(t *testing.T) {
"format": "time_series",
"refId": "A"
}`)
timerange := plugins.NewDataTimeRange("12h", "now")
query.TimeRange = &timerange
models, err := executor.parseQuery(dsInfo, query)
timeRange := backend.TimeRange{
From: now,
To: now.Add(12 * time.Hour),
}
query.TimeRange = timeRange
models, err := service.parseQuery([]backend.DataQuery{query})
require.NoError(t, err)
require.Equal(t, time.Second*30, models[0].Step)
})
@ -84,9 +69,12 @@ func TestPrometheus(t *testing.T) {
"stepMode": "exact",
"interval": "7s"
}`)
timerange := plugins.NewDataTimeRange("12h", "now")
query.TimeRange = &timerange
models, err := executor.parseQuery(dsInfo, query)
timeRange := backend.TimeRange{
From: now,
To: now.Add(12 * time.Hour),
}
query.TimeRange = timeRange
models, err := service.parseQuery([]backend.DataQuery{query})
require.NoError(t, err)
require.Equal(t, time.Second*7, models[0].Step)
})
@ -99,9 +87,12 @@ func TestPrometheus(t *testing.T) {
"stepMode": "max",
"interval": "6s"
}`)
timerange := plugins.NewDataTimeRange("12h", "now")
query.TimeRange = &timerange
models, err := executor.parseQuery(dsInfo, query)
timeRange := backend.TimeRange{
From: now,
To: now.Add(12 * time.Hour),
}
query.TimeRange = timeRange
models, err := service.parseQuery([]backend.DataQuery{query})
require.NoError(t, err)
require.Equal(t, time.Second*6, models[0].Step)
})
@ -114,9 +105,12 @@ func TestPrometheus(t *testing.T) {
"stepMode": "max",
"interval": "100s"
}`)
timerange := plugins.NewDataTimeRange("12h", "now")
query.TimeRange = &timerange
models, err := executor.parseQuery(dsInfo, query)
timeRange := backend.TimeRange{
From: now,
To: now.Add(12 * time.Hour),
}
query.TimeRange = timeRange
models, err := service.parseQuery([]backend.DataQuery{query})
require.NoError(t, err)
require.Equal(t, time.Second*30, models[0].Step)
})
@ -129,9 +123,12 @@ func TestPrometheus(t *testing.T) {
"stepMode": "max",
"interval": "2s"
}`)
timerange := plugins.NewDataTimeRange("12h", "now")
query.TimeRange = &timerange
models, err := executor.parseQuery(dsInfo, query)
timeRange := backend.TimeRange{
From: now,
To: now.Add(12 * time.Hour),
}
query.TimeRange = timeRange
models, err := service.parseQuery([]backend.DataQuery{query})
require.NoError(t, err)
require.Equal(t, time.Second*5, models[0].Step)
})
@ -143,69 +140,59 @@ func TestPrometheus(t *testing.T) {
"intervalFactor": 1,
"refId": "A"
}`)
models, err := executor.parseQuery(dsInfo, query)
models, err := service.parseQuery([]backend.DataQuery{query})
require.NoError(t, err)
require.Equal(t, time.Minute*2, models[0].Step)
timeRange := plugins.NewDataTimeRange("1h", "now")
query.TimeRange = &timeRange
models, err = executor.parseQuery(dsInfo, query)
timeRange := backend.TimeRange{
From: now,
To: now.Add(1 * time.Hour),
}
query.TimeRange = timeRange
models, err = service.parseQuery([]backend.DataQuery{query})
require.NoError(t, err)
require.Equal(t, time.Second*15, models[0].Step)
})
t.Run("parsing query model with high intervalFactor", func(t *testing.T) {
models, err := executor.parseQuery(dsInfo, queryContext(`{
models, err := service.parseQuery([]backend.DataQuery{queryContext(`{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 10,
"refId": "A"
}`))
}`)})
require.NoError(t, err)
require.Equal(t, time.Minute*20, models[0].Step)
})
t.Run("parsing query model with low intervalFactor", func(t *testing.T) {
models, err := executor.parseQuery(dsInfo, queryContext(`{
models, err := service.parseQuery([]backend.DataQuery{queryContext(`{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}`))
}`)})
require.NoError(t, err)
require.Equal(t, time.Minute*2, models[0].Step)
})
t.Run("runs query with custom params", func(t *testing.T) {
query := queryContext(`{
"expr": "go_goroutines",
"format": "time_series",
"intervalFactor": 1,
"refId": "A"
}`)
_, _ = executor.DataQuery(context.Background(), dsInfo, query)
require.NotNil(t, capturedRequest)
require.Equal(t, "custom=par%2Fam&second=f+oo", capturedRequest.URL.RawQuery)
})
}
func queryContext(json string) plugins.DataQuery {
jsonModel, _ := simplejson.NewJson([]byte(json))
queryModels := []plugins.DataSubQuery{
{Model: jsonModel},
func queryContext(json string) backend.DataQuery {
timeRange := backend.TimeRange{
From: now,
To: now.Add(48 * time.Hour),
}
timeRange := plugins.NewDataTimeRange("48h", "now")
return plugins.DataQuery{
TimeRange: &timeRange,
Queries: queryModels,
return backend.DataQuery{
TimeRange: timeRange,
RefID: "A",
JSON: []byte(json),
}
}
func TestParseResponse(t *testing.T) {
t.Run("value is not of type matrix", func(t *testing.T) {
//nolint: staticcheck // plugins.DataQueryResult deprecated
queryRes := plugins.DataQueryResult{}
queryRes := data.Frames{}
value := p.Vector{}
res, err := parseResponse(value, nil)
@ -233,19 +220,18 @@ func TestParseResponse(t *testing.T) {
res, err := parseResponse(value, query)
require.NoError(t, err)
decoded, _ := res.Dataframes.Decoded()
require.Len(t, decoded, 1)
require.Equal(t, decoded[0].Name, "legend Application")
require.Len(t, decoded[0].Fields, 2)
require.Len(t, decoded[0].Fields[0].Labels, 0)
require.Equal(t, decoded[0].Fields[0].Name, "time")
require.Len(t, decoded[0].Fields[1].Labels, 2)
require.Equal(t, decoded[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
require.Equal(t, decoded[0].Fields[1].Name, "value")
require.Equal(t, decoded[0].Fields[1].Config.DisplayNameFromDS, "legend Application")
require.Len(t, res, 1)
require.Equal(t, res[0].Name, "legend Application")
require.Len(t, res[0].Fields, 2)
require.Len(t, res[0].Fields[0].Labels, 0)
require.Equal(t, res[0].Fields[0].Name, "time")
require.Len(t, res[0].Fields[1].Labels, 2)
require.Equal(t, res[0].Fields[1].Labels.String(), "app=Application, tag2=tag2")
require.Equal(t, res[0].Fields[1].Name, "value")
require.Equal(t, res[0].Fields[1].Config.DisplayNameFromDS, "legend Application")
// Ensure the timestamps are UTC zoned
testValue := decoded[0].Fields[0].At(0)
testValue := res[0].Fields[0].At(0)
require.Equal(t, "UTC", testValue.(time.Time).Location().String())
})
}

View File

@ -16,7 +16,6 @@ import (
"github.com/grafana/grafana/pkg/tsdb/mssql"
"github.com/grafana/grafana/pkg/tsdb/mysql"
"github.com/grafana/grafana/pkg/tsdb/postgres"
"github.com/grafana/grafana/pkg/tsdb/prometheus"
)
// NewService returns a new Service.
@ -52,7 +51,6 @@ type Service struct {
// Init initialises the service.
func (s *Service) Init() error {
s.registry["prometheus"] = prometheus.New(s.HTTPClientProvider)
s.registry["mssql"] = mssql.NewExecutor
s.registry["postgres"] = s.PostgresService.NewExecutor
s.registry["mysql"] = mysql.New(s.HTTPClientProvider)