Chore: Refactor influxdb to use SDK contracts (#36189)

* Use backend SDK for influxdb

* Remove BasicAuth condition, some comments

* Remove not used fields from datasource info

* Register InfluxDBService

* Fix casting and make HTTPClientProvider exported

* Remove unused function

* Remove empty line

* Update pkg/tsdb/influxdb/flux/query_models.go

Co-authored-by: Gábor Farkas <gabor.farkas@gmail.com>

* Read interval from TimeRange instead of Interval

* Change pkg name from datasource->models, minor changes

* Use testify instead of convey

* Add new calculator logic and fix pointer semantic for dsInfo

* Initialise parsers, use tsdb interval pkg

Co-authored-by: Gábor Farkas <gabor.farkas@gmail.com>
This commit is contained in:
idafurjes
2021-07-19 11:32:33 +02:00
committed by GitHub
parent 27b7c35ccc
commit cec12676e7
15 changed files with 525 additions and 444 deletions

View File

@@ -14,10 +14,8 @@ import (
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/experimental"
"github.com/grafana/grafana/pkg/components/securejsondata"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/xorcare/pointer"
@@ -223,15 +221,11 @@ func TestRealQuery(t *testing.T) {
json := simplejson.New()
json.Set("organization", "test-org")
dsInfo := &models.DataSource{
Url: "http://localhost:9999", // NOTE! no api/v2
JsonData: json,
SecureJsonData: securejsondata.GetEncryptedJsonData(map[string]string{
"token": "PjSEcM5oWhqg2eI6IXcqYJFe5UbMM_xt-UNlAL0BRYJqLeVpcdMWidiPfWxGhu4Xrh6wioRR-CiadCg-ady68Q==",
}),
dsInfo := &models.DatasourceInfo{
URL: "http://localhost:9999", // NOTE! no api/v2
}
runner, err := runnerFromDataSource(httpclient.NewProvider(), dsInfo)
runner, err := runnerFromDataSource(dsInfo)
require.NoError(t, err)
dr := executeQuery(context.Background(), queryModel{

View File

@@ -5,10 +5,8 @@ import (
"fmt"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
@@ -22,31 +20,29 @@ func init() {
}
// Query builds flux queries, executes them, and returns the results.
//nolint: staticcheck // plugins.DataQuery deprecated
func Query(ctx context.Context, httpClientProvider httpclient.Provider, dsInfo *models.DataSource, tsdbQuery plugins.DataQuery) (
plugins.DataResponse, error) {
func Query(ctx context.Context, dsInfo *models.DatasourceInfo, tsdbQuery backend.QueryDataRequest) (
*backend.QueryDataResponse, error) {
tRes := backend.NewQueryDataResponse()
glog.Debug("Received a query", "query", tsdbQuery)
tRes := plugins.DataResponse{
Results: make(map[string]plugins.DataQueryResult),
}
r, err := runnerFromDataSource(httpClientProvider, dsInfo)
r, err := runnerFromDataSource(dsInfo)
if err != nil {
return plugins.DataResponse{}, err
return &backend.QueryDataResponse{}, err
}
defer r.client.Close()
timeRange := tsdbQuery.Queries[0].TimeRange
for _, query := range tsdbQuery.Queries {
qm, err := getQueryModelTSDB(query, *tsdbQuery.TimeRange, dsInfo)
qm, err := getQueryModel(query, timeRange, dsInfo)
if err != nil {
tRes.Results[query.RefID] = plugins.DataQueryResult{Error: err}
tRes.Responses[query.RefID] = backend.DataResponse{Error: err}
continue
}
// If the default changes also update labels/placeholder in config page.
maxSeries := dsInfo.JsonData.Get("maxSeries").MustInt(1000)
maxSeries := dsInfo.MaxSeries
res := executeQuery(ctx, *qm, r, maxSeries)
tRes.Results[query.RefID] = backendDataResponseToDataResponse(&res, query.RefID)
tRes.Responses[query.RefID] = res
}
return tRes, nil
}
@@ -70,44 +66,20 @@ func (r *runner) runQuery(ctx context.Context, fluxQuery string) (*api.QueryTabl
}
// runnerFromDataSource creates a runner from the datasource model (the datasource instance's configuration).
func runnerFromDataSource(httpClientProvider httpclient.Provider, dsInfo *models.DataSource) (*runner, error) {
org := dsInfo.JsonData.Get("organization").MustString("")
func runnerFromDataSource(dsInfo *models.DatasourceInfo) (*runner, error) {
org := dsInfo.Organization
if org == "" {
return nil, fmt.Errorf("missing organization in datasource configuration")
}
url := dsInfo.Url
url := dsInfo.URL
if url == "" {
return nil, fmt.Errorf("missing URL from datasource configuration")
}
token, found := dsInfo.SecureJsonData.DecryptedValue("token")
if !found {
return nil, fmt.Errorf("token is missing from datasource configuration and is needed to use Flux")
}
opts := influxdb2.DefaultOptions()
hc, err := dsInfo.GetHTTPClient(httpClientProvider)
if err != nil {
return nil, err
}
opts.HTTPOptions().SetHTTPClient(hc)
opts.HTTPOptions().SetHTTPClient(dsInfo.HTTPClient)
return &runner{
client: influxdb2.NewClientWithOptions(url, token, opts),
client: influxdb2.NewClientWithOptions(url, dsInfo.Token, opts),
org: org,
}, nil
}
// backendDataResponseToDataResponse takes the SDK's style response and changes it into a
// plugins.DataQueryResult. This is a wrapper so less of existing code needs to be changed. This should
// be able to be removed in the near future https://github.com/grafana/grafana/pull/25472.
//nolint: staticcheck // plugins.DataQueryResult deprecated
func backendDataResponseToDataResponse(dr *backend.DataResponse, refID string) plugins.DataQueryResult {
qr := plugins.DataQueryResult{
RefID: refID,
Error: dr.Error,
}
if dr.Frames != nil {
qr.Dataframes = plugins.NewDecodedDataFrames(dr.Frames)
}
return qr
}

View File

@@ -6,8 +6,7 @@ import (
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
// queryOptions represents datasource configuration options
@@ -28,66 +27,29 @@ type queryModel struct {
Interval time.Duration `json:"-"`
}
// The following is commented out but kept as it should be useful when
// restoring this code to be closer to the SDK's models.
// func GetQueryModel(query backend.DataQuery) (*queryModel, error) {
// model := &queryModel{}
// err := json.Unmarshal(query.JSON, &model)
// if err != nil {
// return nil, fmt.Errorf("error reading query: %s", err.Error())
// }
// // Copy directly from the well typed query
// model.TimeRange = query.TimeRange
// model.MaxDataPoints = query.MaxDataPoints
// model.Interval = query.Interval
// return model, nil
// }
// getQueryModelTSDB builds a queryModel from plugins.DataQuery information and datasource configuration (dsInfo).
func getQueryModelTSDB(query plugins.DataSubQuery, timeRange plugins.DataTimeRange,
dsInfo *models.DataSource) (*queryModel, error) {
func getQueryModel(query backend.DataQuery, timeRange backend.TimeRange,
dsInfo *models.DatasourceInfo) (*queryModel, error) {
model := &queryModel{}
queryBytes, err := query.Model.Encode()
if err != nil {
return nil, fmt.Errorf("failed to re-encode the flux query into JSON: %w", err)
}
if err := json.Unmarshal(queryBytes, &model); err != nil {
if err := json.Unmarshal(query.JSON, model); err != nil {
return nil, fmt.Errorf("error reading query: %w", err)
}
if model.Options.DefaultBucket == "" {
model.Options.DefaultBucket = dsInfo.JsonData.Get("defaultBucket").MustString("")
model.Options.DefaultBucket = dsInfo.DefaultBucket
}
if model.Options.Bucket == "" {
model.Options.Bucket = model.Options.DefaultBucket
}
if model.Options.Organization == "" {
model.Options.Organization = dsInfo.JsonData.Get("organization").MustString("")
}
startTime, err := timeRange.ParseFrom()
if err != nil && timeRange.From != "" {
return nil, fmt.Errorf("error reading startTime: %w", err)
}
endTime, err := timeRange.ParseTo()
if err != nil && timeRange.To != "" {
return nil, fmt.Errorf("error reading endTime: %w", err)
model.Options.Organization = dsInfo.Organization
}
// Copy directly from the well typed query
model.TimeRange = backend.TimeRange{
From: startTime,
To: endTime,
}
model.TimeRange = timeRange
model.MaxDataPoints = query.MaxDataPoints
if model.MaxDataPoints == 0 {
model.MaxDataPoints = 10000 // 10k/series should be a reasonable place to abort!
}
model.Interval = time.Millisecond * time.Duration(query.IntervalMS)
model.Interval = query.Interval
if model.Interval.Milliseconds() == 0 {
model.Interval = time.Millisecond // 1ms
}

View File

@@ -2,6 +2,7 @@ package influxdb
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
@@ -9,30 +10,28 @@ import (
"path"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/plugins/backendplugin"
"github.com/grafana/grafana/pkg/plugins/backendplugin/coreplugin"
"github.com/grafana/grafana/pkg/registry"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/influxdb/flux"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
)
type Executor struct {
httpClientProvider httpclient.Provider
QueryParser *InfluxdbQueryParser
ResponseParser *ResponseParser
}
type Service struct {
HTTPClientProvider httpclient.Provider `inject:""`
BackendPluginManager backendplugin.Manager `inject:""`
QueryParser *InfluxdbQueryParser
ResponseParser *ResponseParser
// nolint:staticcheck // plugins.DataPlugin deprecated
func New(httpClientProvider httpclient.Provider) func(*models.DataSource) (plugins.DataPlugin, error) {
// nolint:staticcheck // plugins.DataPlugin deprecated
return func(dsInfo *models.DataSource) (plugins.DataPlugin, error) {
return &Executor{
httpClientProvider: httpClientProvider,
QueryParser: &InfluxdbQueryParser{},
ResponseParser: &ResponseParser{},
}, nil
}
im instancemgmt.InstanceManager
}
var (
@@ -42,17 +41,81 @@ var (
var ErrInvalidHttpMode error = errors.New("'httpMode' should be either 'GET' or 'POST'")
func init() {
glog = log.New("tsdb.influxdb")
registry.Register(&registry.Descriptor{
Name: "InfluxDBService",
InitPriority: registry.Low,
Instance: &Service{},
})
}
//nolint: staticcheck // plugins.DataResponse deprecated
func (e *Executor) DataQuery(ctx context.Context, dsInfo *models.DataSource, tsdbQuery plugins.DataQuery) (
plugins.DataResponse, error) {
glog.Debug("Received a query request", "numQueries", len(tsdbQuery.Queries))
func (s *Service) Init() error {
glog = log.New("tsdb.influxdb")
s.QueryParser = &InfluxdbQueryParser{}
s.ResponseParser = &ResponseParser{}
s.im = datasource.NewInstanceManager(newInstanceSettings(s.HTTPClientProvider))
version := dsInfo.JsonData.Get("version").MustString("")
factory := coreplugin.New(backend.ServeOpts{
QueryDataHandler: s,
})
if err := s.BackendPluginManager.RegisterAndStart(context.Background(), "influxdb", factory); err != nil {
glog.Error("Failed to register plugin", "error", err)
}
return nil
}
func newInstanceSettings(httpClientProvider httpclient.Provider) datasource.InstanceFactoryFunc {
return func(settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
opts, err := settings.HTTPClientOptions()
if err != nil {
return nil, err
}
client, err := httpClientProvider.New(opts)
if err != nil {
return nil, err
}
jsonData := models.DatasourceInfo{}
err = json.Unmarshal(settings.JSONData, &jsonData)
if err != nil {
return nil, fmt.Errorf("error reading settings: %w", err)
}
httpMode := jsonData.HTTPMode
if httpMode == "" {
httpMode = "GET"
}
maxSeries := jsonData.MaxSeries
if maxSeries == 0 {
maxSeries = 1000
}
model := &models.DatasourceInfo{
HTTPClient: client,
URL: settings.URL,
Database: settings.Database,
Version: jsonData.Version,
HTTPMode: httpMode,
TimeInterval: jsonData.TimeInterval,
DefaultBucket: jsonData.DefaultBucket,
Organization: jsonData.Organization,
MaxSeries: maxSeries,
Token: settings.DecryptedSecureJSONData["token"],
}
return model, nil
}
}
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
glog.Debug("Received a query request", "numQueries", len(req.Queries))
dsInfo, err := s.getDSInfo(req.PluginContext)
if err != nil {
return nil, err
}
version := dsInfo.Version
if version == "Flux" {
return flux.Query(ctx, e.httpClientProvider, dsInfo, tsdbQuery)
return flux.Query(ctx, dsInfo, *req)
}
glog.Debug("Making a non-Flux type query")
@@ -60,70 +123,65 @@ func (e *Executor) DataQuery(ctx context.Context, dsInfo *models.DataSource, tsd
// NOTE: the following path is currently only called from alerting queries
// In dashboards, the request runs through proxy and are managed in the frontend
query, err := e.getQuery(dsInfo, tsdbQuery)
query, err := s.getQuery(dsInfo, req)
if err != nil {
return plugins.DataResponse{}, err
return &backend.QueryDataResponse{}, err
}
rawQuery, err := query.Build(tsdbQuery)
rawQuery, err := query.Build(req)
if err != nil {
return plugins.DataResponse{}, err
return &backend.QueryDataResponse{}, err
}
if setting.Env == setting.Dev {
glog.Debug("Influxdb query", "raw query", rawQuery)
}
req, err := e.createRequest(ctx, dsInfo, rawQuery)
request, err := s.createRequest(ctx, dsInfo, rawQuery)
if err != nil {
return plugins.DataResponse{}, err
return &backend.QueryDataResponse{}, err
}
httpClient, err := dsInfo.GetHTTPClient(e.httpClientProvider)
res, err := dsInfo.HTTPClient.Do(request)
if err != nil {
return plugins.DataResponse{}, err
}
resp, err := httpClient.Do(req)
if err != nil {
return plugins.DataResponse{}, err
return &backend.QueryDataResponse{}, err
}
defer func() {
if err := resp.Body.Close(); err != nil {
if err := res.Body.Close(); err != nil {
glog.Warn("Failed to close response body", "err", err)
}
}()
if resp.StatusCode/100 != 2 {
return plugins.DataResponse{}, fmt.Errorf("InfluxDB returned error status: %s", resp.Status)
if res.StatusCode/100 != 2 {
return &backend.QueryDataResponse{}, fmt.Errorf("InfluxDB returned error status: %s", res.Status)
}
result := plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{
"A": e.ResponseParser.Parse(resp.Body, query),
},
}
resp := s.ResponseParser.Parse(res.Body, query)
return result, nil
return resp, nil
}
func (e *Executor) getQuery(dsInfo *models.DataSource, query plugins.DataQuery) (*Query, error) {
func (s *Service) getQuery(dsInfo *models.DatasourceInfo, query *backend.QueryDataRequest) (*Query, error) {
if len(query.Queries) == 0 {
return nil, fmt.Errorf("query request contains no queries")
}
// The model supports multiple queries, but right now this is only used from
// alerting so we only needed to support batch executing 1 query at a time.
return e.QueryParser.Parse(query.Queries[0].Model, dsInfo)
model, err := simplejson.NewJson(query.Queries[0].JSON)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal query")
}
return s.QueryParser.Parse(model, dsInfo)
}
func (e *Executor) createRequest(ctx context.Context, dsInfo *models.DataSource, query string) (*http.Request, error) {
u, err := url.Parse(dsInfo.Url)
func (s *Service) createRequest(ctx context.Context, dsInfo *models.DatasourceInfo, query string) (*http.Request, error) {
u, err := url.Parse(dsInfo.URL)
if err != nil {
return nil, err
}
u.Path = path.Join(u.Path, "query")
httpMode := dsInfo.JsonData.Get("httpMode").MustString("GET")
httpMode := dsInfo.HTTPMode
var req *http.Request
switch httpMode {
@@ -158,14 +216,20 @@ func (e *Executor) createRequest(ctx context.Context, dsInfo *models.DataSource,
req.URL.RawQuery = params.Encode()
if dsInfo.BasicAuth {
req.SetBasicAuth(dsInfo.BasicAuthUser, dsInfo.DecryptedBasicAuthPassword())
}
if !dsInfo.BasicAuth && dsInfo.User != "" {
req.SetBasicAuth(dsInfo.User, dsInfo.DecryptedPassword())
}
glog.Debug("Influxdb request", "url", req.URL.String())
return req, nil
}
func (s *Service) getDSInfo(pluginCtx backend.PluginContext) (*models.DatasourceInfo, error) {
i, err := s.im.Get(pluginCtx)
if err != nil {
return nil, err
}
instance, ok := i.(*models.DatasourceInfo)
if !ok {
return nil, fmt.Errorf("failed to cast datsource info")
}
return instance, nil
}

View File

@@ -6,26 +6,28 @@ import (
"net/url"
"testing"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestExecutor_createRequest(t *testing.T) {
datasource := &models.DataSource{
Url: "http://awesome-influxdb:1337",
datasource := &models.DatasourceInfo{
URL: "http://awesome-influxdb:1337",
Database: "awesome-db",
JsonData: simplejson.New(),
HTTPMode: "GET",
}
query := "SELECT awesomeness FROM somewhere"
e := &Executor{
s := &Service{
QueryParser: &InfluxdbQueryParser{},
ResponseParser: &ResponseParser{},
}
t.Run("createRequest with GET httpMode", func(t *testing.T) {
req, err := e.createRequest(context.Background(), datasource, query)
glog = log.New("test")
req, err := s.createRequest(context.Background(), datasource, query)
require.NoError(t, err)
assert.Equal(t, "GET", req.Method)
@@ -37,8 +39,8 @@ func TestExecutor_createRequest(t *testing.T) {
})
t.Run("createRequest with POST httpMode", func(t *testing.T) {
datasource.JsonData.Set("httpMode", "POST")
req, err := e.createRequest(context.Background(), datasource, query)
datasource.HTTPMode = "POST"
req, err := s.createRequest(context.Background(), datasource, query)
require.NoError(t, err)
assert.Equal(t, "POST", req.Method)
@@ -56,8 +58,8 @@ func TestExecutor_createRequest(t *testing.T) {
})
t.Run("createRequest with PUT httpMode", func(t *testing.T) {
datasource.JsonData.Set("httpMode", "PUT")
_, err := e.createRequest(context.Background(), datasource, query)
datasource.HTTPMode = "PUT"
_, err := s.createRequest(context.Background(), datasource, query)
require.EqualError(t, err, ErrInvalidHttpMode.Error())
})
}

View File

@@ -5,13 +5,13 @@ import (
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb/interval"
"github.com/grafana/grafana/pkg/tsdb"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
)
type InfluxdbQueryParser struct{}
func (qp *InfluxdbQueryParser) Parse(model *simplejson.Json, dsInfo *models.DataSource) (*Query, error) {
func (qp *InfluxdbQueryParser) Parse(model *simplejson.Json, dsInfo *models.DatasourceInfo) (*Query, error) {
policy := model.Get("policy").MustString("default")
rawQuery := model.Get("query").MustString("")
useRawQuery := model.Get("rawQuery").MustBool(false)
@@ -40,7 +40,9 @@ func (qp *InfluxdbQueryParser) Parse(model *simplejson.Json, dsInfo *models.Data
return nil, err
}
parsedInterval, err := interval.GetIntervalFrom(dsInfo, model, time.Millisecond*1)
queryInterval := model.Get("interval").MustString("")
intervalMS := model.Get("intervalMs").MustInt(0)
parsedInterval, err := tsdb.GetIntervalFrom(dsInfo.TimeInterval, queryInterval, int64(intervalMS), time.Millisecond*1)
if err != nil {
return nil, err
}

View File

@@ -5,15 +5,13 @@ import (
"time"
"github.com/grafana/grafana/pkg/components/simplejson"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/tsdb/influxdb/models"
"github.com/stretchr/testify/require"
)
func TestInfluxdbQueryParser_Parse(t *testing.T) {
parser := &InfluxdbQueryParser{}
dsInfo := &models.DataSource{
JsonData: simplejson.New(),
}
dsInfo := &models.DatasourceInfo{}
t.Run("can parse influxdb json model", func(t *testing.T) {
json := `
@@ -105,7 +103,7 @@ func TestInfluxdbQueryParser_Parse(t *testing.T) {
]
}
`
dsInfo.JsonData.Set("timeInterval", ">20s")
dsInfo.TimeInterval = ">20s"
modelJSON, err := simplejson.NewJson([]byte(json))
require.NoError(t, err)

View File

@@ -0,0 +1,19 @@
package models
import (
"net/http"
)
type DatasourceInfo struct {
HTTPClient *http.Client
Token string
URL string
Database string `json:"database"`
Version string `json:"version"`
HTTPMode string `json:"httpMode"`
TimeInterval string `json:"timeInterval"`
DefaultBucket string `json:"defaultBucket"`
Organization string `json:"organization"`
MaxSeries int `json:"maxSeries"`
}

View File

@@ -5,9 +5,10 @@ import (
"regexp"
"strconv"
"strings"
"time"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/tsdb/interval"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana/pkg/tsdb"
)
var (
@@ -15,7 +16,7 @@ var (
regexpMeasurementPattern = regexp.MustCompile(`^\/.*\/$`)
)
func (query *Query) Build(queryContext plugins.DataQuery) (string, error) {
func (query *Query) Build(queryContext *backend.QueryDataRequest) (string, error) {
var res string
if query.UseRawQuery && query.RawQuery != "" {
res = query.RawQuery
@@ -28,13 +29,15 @@ func (query *Query) Build(queryContext plugins.DataQuery) (string, error) {
res += query.renderTz()
}
calculator := interval.NewCalculator(interval.CalculatorOptions{})
i := calculator.Calculate(*queryContext.TimeRange, query.Interval)
calculator := tsdb.NewCalculator(tsdb.CalculatorOptions{})
i := calculator.Calculate(queryContext.Queries[0].TimeRange, query.Interval)
res = strings.ReplaceAll(res, "$timeFilter", query.renderTimeFilter(queryContext))
res = strings.ReplaceAll(res, "$interval", i.Text)
res = strings.ReplaceAll(res, "$__interval_ms", strconv.FormatInt(i.Milliseconds(), 10))
res = strings.ReplaceAll(res, "$__interval", i.Text)
return res, nil
}
@@ -78,40 +81,12 @@ func (query *Query) renderTags() []string {
return res
}
func isTimeRangeNumeric(tr *plugins.DataTimeRange) bool {
if _, err := strconv.ParseInt(tr.From, 10, 64); err != nil {
return false
}
if _, err := strconv.ParseInt(tr.To, 10, 64); err != nil {
return false
}
return true
func (query *Query) renderTimeFilter(queryContext *backend.QueryDataRequest) string {
from, to := epochMStoInfluxTime(&queryContext.Queries[0].TimeRange)
return fmt.Sprintf("time > %s and time < %s", from, to)
}
func (query *Query) renderTimeFilter(queryContext plugins.DataQuery) string {
// If from expressions
if isTimeRangeNumeric(queryContext.TimeRange) {
from, to, err := epochMStoInfluxTime(queryContext.TimeRange)
if err == nil {
return fmt.Sprintf(" time > %s and time < %s ", from, to)
}
// on error fallback to original time range processing.
glog.Warn("failed to parse expected time range in query, falling back to non-expression time range processing", "error", err)
}
// else from dashboard alerting
from := "now() - " + queryContext.TimeRange.From
to := ""
if queryContext.TimeRange.To != "now" && queryContext.TimeRange.To != "" {
to = " and time < now() - " + strings.Replace(queryContext.TimeRange.To, "now-", "", 1)
}
return fmt.Sprintf("time > %s%s", from, to)
}
func (query *Query) renderSelectors(queryContext plugins.DataQuery) string {
func (query *Query) renderSelectors(queryContext *backend.QueryDataRequest) string {
res := "SELECT "
var selectors []string
@@ -158,7 +133,7 @@ func (query *Query) renderWhereClause() string {
return res
}
func (query *Query) renderGroupBy(queryContext plugins.DataQuery) string {
func (query *Query) renderGroupBy(queryContext *backend.QueryDataRequest) string {
groupBy := ""
for i, group := range query.GroupBy {
if i == 0 {
@@ -185,16 +160,9 @@ func (query *Query) renderTz() string {
return fmt.Sprintf(" tz('%s')", tz)
}
func epochMStoInfluxTime(tr *plugins.DataTimeRange) (string, string, error) {
from, err := strconv.ParseInt(tr.From, 10, 64)
if err != nil {
return "", "", err
}
func epochMStoInfluxTime(tr *backend.TimeRange) (string, string) {
from := tr.From.UnixNano() / int64(time.Millisecond)
to := tr.To.UnixNano() / int64(time.Millisecond)
to, err := strconv.ParseInt(tr.To, 10, 64)
if err != nil {
return "", "", err
}
return fmt.Sprintf("%dms", from), fmt.Sprintf("%dms", to), nil
return fmt.Sprintf("%dms", from), fmt.Sprintf("%dms", to)
}

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"strings"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
var renders map[string]QueryDefinition
@@ -15,7 +15,7 @@ type DefinitionParameters struct {
}
type QueryDefinition struct {
Renderer func(query *Query, queryContext plugins.DataQuery, part *QueryPart, innerExpr string) string
Renderer func(query *Query, queryContext *backend.QueryDataRequest, part *QueryPart, innerExpr string) string
Params []DefinitionParameters
}
@@ -97,14 +97,14 @@ func init() {
renders["alias"] = QueryDefinition{Renderer: aliasRenderer}
}
func fieldRenderer(query *Query, queryContext plugins.DataQuery, part *QueryPart, innerExpr string) string {
func fieldRenderer(query *Query, queryContext *backend.QueryDataRequest, part *QueryPart, innerExpr string) string {
if part.Params[0] == "*" {
return "*"
}
return fmt.Sprintf(`"%s"`, part.Params[0])
}
func functionRenderer(query *Query, queryContext plugins.DataQuery, part *QueryPart, innerExpr string) string {
func functionRenderer(query *Query, queryContext *backend.QueryDataRequest, part *QueryPart, innerExpr string) string {
for i, param := range part.Params {
if part.Type == "time" && param == "auto" {
part.Params[i] = "$__interval"
@@ -120,11 +120,11 @@ func functionRenderer(query *Query, queryContext plugins.DataQuery, part *QueryP
return fmt.Sprintf("%s(%s)", part.Type, params)
}
func suffixRenderer(query *Query, queryContext plugins.DataQuery, part *QueryPart, innerExpr string) string {
func suffixRenderer(query *Query, queryContext *backend.QueryDataRequest, part *QueryPart, innerExpr string) string {
return fmt.Sprintf("%s %s", innerExpr, part.Params[0])
}
func aliasRenderer(query *Query, queryContext plugins.DataQuery, part *QueryPart, innerExpr string) string {
func aliasRenderer(query *Query, queryContext *backend.QueryDataRequest, part *QueryPart, innerExpr string) string {
return fmt.Sprintf(`%s AS "%s"`, innerExpr, part.Params[0])
}
@@ -147,6 +147,6 @@ type QueryPart struct {
Params []string
}
func (qp *QueryPart) Render(query *Query, queryContext plugins.DataQuery, expr string) string {
func (qp *QueryPart) Render(query *Query, queryContext *backend.QueryDataRequest, expr string) string {
return qp.Def.Renderer(query, queryContext, qp, expr)
}

View File

@@ -3,7 +3,7 @@ package influxdb
import (
"testing"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana-plugin-sdk-go/backend"
)
func TestInfluxdbQueryPart(t *testing.T) {
@@ -27,8 +27,7 @@ func TestInfluxdbQueryPart(t *testing.T) {
{mode: "non_negative_difference", params: []string{}, input: "max(value)", expected: `non_negative_difference(max(value))`},
}
timeRange := plugins.NewDataTimeRange("5m", "now")
queryContext := plugins.DataQuery{TimeRange: &timeRange}
queryContext := &backend.QueryDataRequest{}
query := &Query{}
for _, tc := range tcs {

View File

@@ -1,17 +1,16 @@
package influxdb
import (
"strings"
"testing"
"time"
"strings"
"github.com/grafana/grafana/pkg/plugins"
. "github.com/smartystreets/goconvey/convey"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/stretchr/testify/require"
)
func TestInfluxdbQueryBuilder(t *testing.T) {
Convey("Influxdb query builder", t, func() {
t.Run("Influxdb query builder", func(t *testing.T) {
qp1, _ := NewQueryPart("field", []string{"value"})
qp2, _ := NewQueryPart("mean", []string{})
@@ -27,12 +26,19 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
tag1 := &Tag{Key: "hostname", Value: "server1", Operator: "="}
tag2 := &Tag{Key: "hostname", Value: "server2", Operator: "=", Condition: "OR"}
timeRange := plugins.NewDataTimeRange("5m", "now")
queryContext := plugins.DataQuery{
TimeRange: &timeRange,
timeRange := backend.TimeRange{
From: time.Date(2020, 8, 1, 0, 0, 0, 0, time.UTC),
To: time.Date(2020, 8, 1, 0, 5, 0, 0, time.UTC),
}
queryContext := &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
TimeRange: timeRange,
},
},
}
Convey("can build simple query", func() {
t.Run("can build simple query", func(t *testing.T) {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
@@ -42,11 +48,11 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
}
rawQuery, err := query.Build(queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "policy"."cpu" WHERE time > now() - 5m GROUP BY time(10s) fill(null)`)
require.NoError(t, err)
require.Equal(t, rawQuery, `SELECT mean("value") FROM "policy"."cpu" WHERE time > 1596240000000ms and time < 1596240300000ms GROUP BY time(10s) fill(null)`)
})
Convey("can build query with tz", func() {
t.Run("can build query with tz", func(t *testing.T) {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
@@ -56,11 +62,11 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
}
rawQuery, err := query.Build(queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE time > now() - 5m GROUP BY time(5s) tz('Europe/Paris')`)
require.NoError(t, err)
require.Equal(t, rawQuery, `SELECT mean("value") FROM "cpu" WHERE time > 1596240000000ms and time < 1596240300000ms GROUP BY time(5s) tz('Europe/Paris')`)
})
Convey("can build query with group bys", func() {
t.Run("can build query with group bys", func(t *testing.T) {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
@@ -70,11 +76,11 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
}
rawQuery, err := query.Build(queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE ("hostname" = 'server1' OR "hostname" = 'server2') AND time > now() - 5m GROUP BY time(5s), "datacenter" fill(null)`)
require.NoError(t, err)
require.Equal(t, rawQuery, `SELECT mean("value") FROM "cpu" WHERE ("hostname" = 'server1' OR "hostname" = 'server2') AND time > 1596240000000ms and time < 1596240300000ms GROUP BY time(5s), "datacenter" fill(null)`)
})
Convey("can build query with math part", func() {
t.Run("can build query with math part", func(t *testing.T) {
query := &Query{
Selects: []*Select{{*qp1, *qp2, *mathPartDivideBy100}},
Measurement: "cpu",
@@ -82,11 +88,11 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
}
rawQuery, err := query.Build(queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") / 100 FROM "cpu" WHERE time > now() - 5m`)
require.NoError(t, err)
require.Equal(t, rawQuery, `SELECT mean("value") / 100 FROM "cpu" WHERE time > 1596240000000ms and time < 1596240300000ms`)
})
Convey("can build query with math part using $__interval_ms variable", func() {
t.Run("can build query with math part using $__interval_ms variable", func(t *testing.T) {
query := &Query{
Selects: []*Select{{*qp1, *qp2, *mathPartDivideByIntervalMs}},
Measurement: "cpu",
@@ -94,40 +100,59 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
}
rawQuery, err := query.Build(queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") / 5000 FROM "cpu" WHERE time > now() - 5m`)
require.NoError(t, err)
require.Equal(t, rawQuery, `SELECT mean("value") / 5000 FROM "cpu" WHERE time > 1596240000000ms and time < 1596240300000ms`)
})
Convey("can build query with old $interval variable", func() {
t.Run("can build query with old $interval variable", func(t *testing.T) {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
Policy: "",
GroupBy: []*QueryPart{groupByOldInterval},
Interval: time.Millisecond * 200,
}
rawQuery, err := query.Build(queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `SELECT mean("value") FROM "cpu" WHERE time > now() - 5m GROUP BY time(200ms)`)
require.NoError(t, err)
require.Equal(t, rawQuery, `SELECT mean("value") FROM "cpu" WHERE time > 1596240000000ms and time < 1596240300000ms GROUP BY time(200ms)`)
})
Convey("can render time range", func() {
t.Run("can render time range", func(t *testing.T) {
query := Query{}
Convey("render from: 2h to now-1h", func() {
t.Run("render from: 2h to now-1h", func(t *testing.T) {
query := Query{}
timeRange := plugins.NewDataTimeRange("2h", "now-1h")
queryContext := plugins.DataQuery{TimeRange: &timeRange}
So(query.renderTimeFilter(queryContext), ShouldEqual, "time > now() - 2h and time < now() - 1h")
timeRange = backend.TimeRange{
From: time.Date(2020, 8, 1, 0, 0, 0, 0, time.UTC),
To: time.Date(2020, 8, 1, 1, 0, 0, 0, time.UTC),
}
queryContext = &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
TimeRange: timeRange,
},
},
}
require.Equal(t, query.renderTimeFilter(queryContext), "time > 1596240000000ms and time < 1596243600000ms")
})
Convey("render from: 10m", func() {
timeRange := plugins.NewDataTimeRange("10m", "now")
queryContext := plugins.DataQuery{TimeRange: &timeRange}
So(query.renderTimeFilter(queryContext), ShouldEqual, "time > now() - 10m")
t.Run("render from: 10m", func(t *testing.T) {
timeRange = backend.TimeRange{
From: time.Date(2020, 8, 1, 0, 0, 0, 0, time.UTC),
To: time.Date(2020, 8, 1, 0, 10, 0, 0, time.UTC),
}
queryContext = &backend.QueryDataRequest{
Queries: []backend.DataQuery{
{
TimeRange: timeRange,
},
},
}
require.Equal(t, query.renderTimeFilter(queryContext), "time > 1596240000000ms and time < 1596240600000ms")
})
})
Convey("can build query from raw query", func() {
t.Run("can build query from raw query", func(t *testing.T) {
query := &Query{
Selects: []*Select{{*qp1, *qp2}},
Measurement: "cpu",
@@ -139,68 +164,68 @@ func TestInfluxdbQueryBuilder(t *testing.T) {
}
rawQuery, err := query.Build(queryContext)
So(err, ShouldBeNil)
So(rawQuery, ShouldEqual, `Raw query`)
require.NoError(t, err)
require.Equal(t, rawQuery, `Raw query`)
})
Convey("can render normal tags without operator", func() {
t.Run("can render normal tags without operator", func(t *testing.T) {
query := &Query{Tags: []*Tag{{Operator: "", Value: `value`, Key: "key"}}}
So(strings.Join(query.renderTags(), ""), ShouldEqual, `"key" = 'value'`)
require.Equal(t, strings.Join(query.renderTags(), ""), `"key" = 'value'`)
})
Convey("can render regex tags without operator", func() {
t.Run("can render regex tags without operator", func(t *testing.T) {
query := &Query{Tags: []*Tag{{Operator: "", Value: `/value/`, Key: "key"}}}
So(strings.Join(query.renderTags(), ""), ShouldEqual, `"key" =~ /value/`)
require.Equal(t, strings.Join(query.renderTags(), ""), `"key" =~ /value/`)
})
Convey("can render regex tags", func() {
t.Run("can render regex tags", func(t *testing.T) {
query := &Query{Tags: []*Tag{{Operator: "=~", Value: `/value/`, Key: "key"}}}
So(strings.Join(query.renderTags(), ""), ShouldEqual, `"key" =~ /value/`)
require.Equal(t, strings.Join(query.renderTags(), ""), `"key" =~ /value/`)
})
Convey("can render number tags", func() {
t.Run("can render number tags", func(t *testing.T) {
query := &Query{Tags: []*Tag{{Operator: "=", Value: "10001", Key: "key"}}}
So(strings.Join(query.renderTags(), ""), ShouldEqual, `"key" = '10001'`)
require.Equal(t, strings.Join(query.renderTags(), ""), `"key" = '10001'`)
})
Convey("can render numbers less then condition tags", func() {
t.Run("can render numbers less then condition tags", func(t *testing.T) {
query := &Query{Tags: []*Tag{{Operator: "<", Value: "10001", Key: "key"}}}
So(strings.Join(query.renderTags(), ""), ShouldEqual, `"key" < 10001`)
require.Equal(t, strings.Join(query.renderTags(), ""), `"key" < 10001`)
})
Convey("can render number greater then condition tags", func() {
t.Run("can render number greater then condition tags", func(t *testing.T) {
query := &Query{Tags: []*Tag{{Operator: ">", Value: "10001", Key: "key"}}}
So(strings.Join(query.renderTags(), ""), ShouldEqual, `"key" > 10001`)
require.Equal(t, strings.Join(query.renderTags(), ""), `"key" > 10001`)
})
Convey("can render string tags", func() {
t.Run("can render string tags", func(t *testing.T) {
query := &Query{Tags: []*Tag{{Operator: "=", Value: "value", Key: "key"}}}
So(strings.Join(query.renderTags(), ""), ShouldEqual, `"key" = 'value'`)
require.Equal(t, strings.Join(query.renderTags(), ""), `"key" = 'value'`)
})
Convey("can escape backslashes when rendering string tags", func() {
t.Run("can escape backslashes when rendering string tags", func(t *testing.T) {
query := &Query{Tags: []*Tag{{Operator: "=", Value: `C:\test\`, Key: "key"}}}
So(strings.Join(query.renderTags(), ""), ShouldEqual, `"key" = 'C:\\test\\'`)
require.Equal(t, strings.Join(query.renderTags(), ""), `"key" = 'C:\\test\\'`)
})
Convey("can render regular measurement", func() {
t.Run("can render regular measurement", func(t *testing.T) {
query := &Query{Measurement: `apa`, Policy: "policy"}
So(query.renderMeasurement(), ShouldEqual, ` FROM "policy"."apa"`)
require.Equal(t, query.renderMeasurement(), ` FROM "policy"."apa"`)
})
Convey("can render regexp measurement", func() {
t.Run("can render regexp measurement", func(t *testing.T) {
query := &Query{Measurement: `/apa/`, Policy: "policy"}
So(query.renderMeasurement(), ShouldEqual, ` FROM "policy"./apa/`)
require.Equal(t, query.renderMeasurement(), ` FROM "policy"./apa/`)
})
})
}

View File

@@ -9,8 +9,8 @@ import (
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/plugins"
)
type ResponseParser struct{}
@@ -23,19 +23,21 @@ func init() {
legendFormat = regexp.MustCompile(`\[\[([\@\/\w-]+)(\.[\@\/\w-]+)*\]\]*|\$\s*([\@\/\w-]+?)*`)
}
// nolint:staticcheck // plugins.DataQueryResult deprecated
func (rp *ResponseParser) Parse(buf io.ReadCloser, query *Query) plugins.DataQueryResult {
var queryRes plugins.DataQueryResult
func (rp *ResponseParser) Parse(buf io.ReadCloser, query *Query) *backend.QueryDataResponse {
resp := backend.NewQueryDataResponse()
queryRes := backend.DataResponse{}
response, jsonErr := parseJSON(buf)
if jsonErr != nil {
queryRes.Error = jsonErr
return queryRes
resp.Responses["A"] = queryRes
return resp
}
if response.Error != "" {
queryRes.Error = fmt.Errorf(response.Error)
return queryRes
resp.Responses["A"] = queryRes
return resp
}
frames := data.Frames{}
@@ -45,9 +47,10 @@ func (rp *ResponseParser) Parse(buf io.ReadCloser, query *Query) plugins.DataQue
queryRes.Error = fmt.Errorf(result.Error)
}
}
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
queryRes.Frames = frames
resp.Responses["A"] = queryRes
return queryRes
return resp
}
func parseJSON(buf io.ReadCloser) (Response, error) {

View File

@@ -8,41 +8,16 @@ import (
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/plugins"
"github.com/stretchr/testify/require"
"github.com/xorcare/pointer"
)
func prepare(text string) io.ReadCloser {
return ioutil.NopCloser(strings.NewReader(text))
}
// nolint:staticcheck // plugins.DataQueryResult deprecated
func decodedFrames(t *testing.T, result plugins.DataQueryResult) data.Frames {
decoded, err := result.Dataframes.Decoded()
require.NoError(t, err)
return decoded
}
// nolint:staticcheck // plugins.DataQueryResult deprecated
func assertSeriesName(t *testing.T, result plugins.DataQueryResult, index int, name string) {
decoded := decodedFrames(t, result)
frame := decoded[index]
require.Equal(t, frame.Name, name)
// the current version of the alerting-code does not use the dataframe-name
// when generating the metric-names for the alerts.
// instead, it goes through multiple attributes on the Field.
// we use the `field.Config.DisplayNameFromDS` attribute.
valueFieldConfig := frame.Fields[1].Config
require.NotNil(t, valueFieldConfig)
require.Equal(t, valueFieldConfig.DisplayNameFromDS, name)
}
func TestInfluxdbResponseParser(t *testing.T) {
t.Run("Influxdb response parser should handle invalid JSON", func(t *testing.T) {
parser := &ResponseParser{}
@@ -53,8 +28,8 @@ func TestInfluxdbResponseParser(t *testing.T) {
result := parser.Parse(prepare(response), query)
require.Nil(t, result.Dataframes)
require.Error(t, result.Error)
require.Nil(t, result.Responses["A"].Frames)
require.Error(t, result.Responses["A"].Error)
})
t.Run("Influxdb response parser should parse everything normally", func(t *testing.T) {
@@ -82,28 +57,28 @@ func TestInfluxdbResponseParser(t *testing.T) {
`
query := &Query{}
labels, err := data.LabelsFromString("datacenter=America")
require.Nil(t, err)
newField := data.NewField("value", labels, []*float64{
pointer.Float64(222), pointer.Float64(222), nil,
})
newField.Config = &data.FieldConfig{DisplayNameFromDS: "cpu.mean { datacenter: America }"}
testFrame := data.NewFrame("cpu.mean { datacenter: America }",
data.NewField("time", nil,
[]time.Time{
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
}),
newField,
)
result := parser.Parse(prepare(response), query)
decoded := decodedFrames(t, result)
require.Len(t, decoded, 2)
frame1 := decoded[0]
frame2 := decoded[1]
assertSeriesName(t, result, 0, "cpu.mean { datacenter: America }")
assertSeriesName(t, result, 1, "cpu.sum { datacenter: America }")
require.Len(t, frame1.Fields, 2)
require.Len(t, frame2.Fields, 2)
require.Equal(t, frame1.Fields[0].Len(), 3)
require.Equal(t, frame1.Fields[1].Len(), 3)
require.Equal(t, frame2.Fields[0].Len(), 3)
require.Equal(t, frame2.Fields[1].Len(), 3)
require.Equal(t, *frame1.Fields[1].At(1).(*float64), 222.0)
require.Equal(t, *frame2.Fields[1].At(1).(*float64), 333.0)
require.Nil(t, frame1.Fields[1].At(2))
frame := result.Responses["A"]
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
t.Run("Influxdb response parser with invalid value-format", func(t *testing.T) {
@@ -131,28 +106,26 @@ func TestInfluxdbResponseParser(t *testing.T) {
query := &Query{}
newField := data.NewField("value", nil, []*float64{
pointer.Float64(50), nil, pointer.Float64(52),
})
newField.Config = &data.FieldConfig{DisplayNameFromDS: "cpu.mean"}
testFrame := data.NewFrame("cpu.mean",
data.NewField("time", nil,
[]time.Time{
time.Date(1970, 1, 1, 0, 1, 40, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 41, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 42, 0, time.UTC),
}),
newField,
)
result := parser.Parse(prepare(response), query)
// the current behavior is that we do not report an error, we turn the invalid value into `nil`
require.Nil(t, result.Error)
require.Equal(t, result.ErrorString, "")
decoded := decodedFrames(t, result)
require.Len(t, decoded, 1)
frame := decoded[0]
require.Len(t, frame.Fields, 2)
field1 := frame.Fields[0]
field2 := frame.Fields[1]
require.Equal(t, field1.Len(), 3)
require.Equal(t, field2.Len(), 3)
require.Equal(t, *field2.At(0).(*float64), 50.0)
require.Nil(t, field2.At(1))
require.Equal(t, *field2.At(2).(*float64), 52.0)
frame := result.Responses["A"]
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
t.Run("Influxdb response parser with invalid timestamp-format", func(t *testing.T) {
@@ -181,27 +154,25 @@ func TestInfluxdbResponseParser(t *testing.T) {
query := &Query{}
newField := data.NewField("value", nil, []*float64{
pointer.Float64(50), pointer.Float64(52),
})
newField.Config = &data.FieldConfig{DisplayNameFromDS: "cpu.mean"}
testFrame := data.NewFrame("cpu.mean",
data.NewField("time", nil,
[]time.Time{
time.Date(1970, 1, 1, 0, 1, 40, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 42, 0, time.UTC),
}),
newField,
)
result := parser.Parse(prepare(response), query)
// the current behavior is that we do not report an error, we skip the item with the invalid timestamp
require.Nil(t, result.Error)
require.Equal(t, result.ErrorString, "")
decoded := decodedFrames(t, result)
require.Len(t, decoded, 1)
frame := decoded[0]
require.Len(t, frame.Fields, 2)
field1 := frame.Fields[0]
field2 := frame.Fields[1]
require.Equal(t, field1.Len(), 2)
require.Equal(t, field2.Len(), 2)
require.Equal(t, *field2.At(0).(*float64), 50.0)
require.Equal(t, *field2.At(1).(*float64), 52.0)
frame := result.Responses["A"]
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
t.Run("Influxdb response parser with alias", func(t *testing.T) {
@@ -233,76 +204,170 @@ func TestInfluxdbResponseParser(t *testing.T) {
`
query := &Query{Alias: "series alias"}
labels, err := data.LabelsFromString("/cluster/name/=Cluster/, @cluster@name@=Cluster@, cluster-name=Cluster, datacenter=America, dc.region.name=Northeast")
require.Nil(t, err)
newField := data.NewField("value", labels, []*float64{
pointer.Float64(222),
})
newField.Config = &data.FieldConfig{DisplayNameFromDS: "series alias"}
testFrame := data.NewFrame("series alias",
data.NewField("time", nil,
[]time.Time{
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
}),
newField,
)
result := parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "series alias")
frame := result.Responses["A"]
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias $m $measurement", Measurement: "10m"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias 10m 10m")
frame = result.Responses["A"]
name := "alias 10m 10m"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias $col", Measurement: "10m"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias mean")
assertSeriesName(t, result, 1, "alias sum")
frame = result.Responses["A"]
name = "alias mean"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
name = "alias sum"
testFrame.Name = name
newField = data.NewField("value", labels, []*float64{
pointer.Float64(333),
})
testFrame.Fields[1] = newField
testFrame.Fields[1].Config = &data.FieldConfig{DisplayNameFromDS: name}
if diff := cmp.Diff(testFrame, frame.Frames[1], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias $tag_datacenter"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias America")
query = &Query{Alias: "alias $1"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias upc")
query = &Query{Alias: "alias $5"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias $5")
query = &Query{Alias: "series alias"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "series alias")
query = &Query{Alias: "alias [[m]] [[measurement]]", Measurement: "10m"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias 10m 10m")
frame = result.Responses["A"]
name = "alias America"
testFrame.Name = name
newField = data.NewField("value", labels, []*float64{
pointer.Float64(222),
})
testFrame.Fields[1] = newField
testFrame.Fields[1].Config = &data.FieldConfig{DisplayNameFromDS: name}
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias [[col]]", Measurement: "10m"}
result = parser.Parse(prepare(response), query)
frame = result.Responses["A"]
name = "alias mean"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
assertSeriesName(t, result, 0, "alias mean")
assertSeriesName(t, result, 1, "alias sum")
query = &Query{Alias: "alias $1"}
result = parser.Parse(prepare(response), query)
frame = result.Responses["A"]
name = "alias upc"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias $5"}
result = parser.Parse(prepare(response), query)
frame = result.Responses["A"]
name = "alias $5"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "series alias"}
result = parser.Parse(prepare(response), query)
frame = result.Responses["A"]
name = "series alias"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias [[m]] [[measurement]]", Measurement: "10m"}
result = parser.Parse(prepare(response), query)
frame = result.Responses["A"]
name = "alias 10m 10m"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias [[tag_datacenter]]"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias America")
frame = result.Responses["A"]
name = "alias America"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias [[tag_dc.region.name]]"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias Northeast")
frame = result.Responses["A"]
name = "alias Northeast"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias [[tag_cluster-name]]"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias Cluster")
frame = result.Responses["A"]
name = "alias Cluster"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias [[tag_/cluster/name/]]"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias Cluster/")
frame = result.Responses["A"]
name = "alias Cluster/"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
query = &Query{Alias: "alias [[tag_@cluster@name@]]"}
result = parser.Parse(prepare(response), query)
assertSeriesName(t, result, 0, "alias Cluster@")
frame = result.Responses["A"]
name = "alias Cluster@"
testFrame.Name = name
testFrame.Fields[1].Config.DisplayNameFromDS = name
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
})
t.Run("Influxdb response parser with errors", func(t *testing.T) {
@@ -333,19 +398,29 @@ func TestInfluxdbResponseParser(t *testing.T) {
`
query := &Query{}
labels, err := data.LabelsFromString("datacenter=America")
require.Nil(t, err)
newField := data.NewField("value", labels, []*float64{
pointer.Float64(222), pointer.Float64(222), nil,
})
newField.Config = &data.FieldConfig{DisplayNameFromDS: "cpu.mean { datacenter: America }"}
testFrame := data.NewFrame("cpu.mean { datacenter: America }",
data.NewField("time", nil,
[]time.Time{
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
time.Date(1970, 1, 1, 0, 1, 51, 0, time.UTC),
}),
newField,
)
result := parser.Parse(prepare(response), query)
decoded := decodedFrames(t, result)
frame := result.Responses["A"]
if diff := cmp.Diff(testFrame, frame.Frames[0], data.FrameTestCompareOptions()...); diff != "" {
t.Errorf("Result mismatch (-want +got):\n%s", diff)
}
require.Len(t, decoded, 2)
require.Equal(t, decoded[0].Fields[0].Len(), 3)
require.Equal(t, decoded[0].Fields[1].Len(), 3)
require.Equal(t, decoded[1].Fields[0].Len(), 3)
require.Equal(t, decoded[1].Fields[1].Len(), 3)
require.EqualError(t, result.Error, "query-timeout limit exceeded")
require.EqualError(t, result.Responses["A"].Error, "query-timeout limit exceeded")
})
t.Run("Influxdb response parser with top-level error", func(t *testing.T) {
@@ -361,9 +436,9 @@ func TestInfluxdbResponseParser(t *testing.T) {
result := parser.Parse(prepare(response), query)
require.Nil(t, result.Dataframes)
require.Nil(t, result.Responses["A"].Frames)
require.EqualError(t, result.Error, "error parsing query: found THING")
require.EqualError(t, result.Responses["A"].Error, "error parsing query: found THING")
})
t.Run("Influxdb response parser parseValue nil", func(t *testing.T) {

View File

@@ -13,7 +13,6 @@ import (
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/tsdb/azuremonitor"
"github.com/grafana/grafana/pkg/tsdb/cloudmonitoring"
"github.com/grafana/grafana/pkg/tsdb/influxdb"
"github.com/grafana/grafana/pkg/tsdb/loki"
"github.com/grafana/grafana/pkg/tsdb/mssql"
"github.com/grafana/grafana/pkg/tsdb/mysql"
@@ -56,7 +55,6 @@ type Service struct {
// Init initialises the service.
func (s *Service) Init() error {
s.registry["prometheus"] = prometheus.New(s.HTTPClientProvider)
s.registry["influxdb"] = influxdb.New(s.HTTPClientProvider)
s.registry["mssql"] = mssql.NewExecutor
s.registry["postgres"] = s.PostgresService.NewExecutor
s.registry["mysql"] = mysql.New(s.HTTPClientProvider)